diff options
Diffstat (limited to 'tests/sync')
| -rw-r--r-- | tests/sync/discovery.rs | 259 | ||||
| -rw-r--r-- | tests/sync/historic_sync.rs | 32 | ||||
| -rw-r--r-- | tests/sync/live_sync.rs | 119 | ||||
| -rw-r--r-- | tests/sync/maintainer_reprocessing.rs | 278 | ||||
| -rw-r--r-- | tests/sync/metrics.rs | 139 | ||||
| -rw-r--r-- | tests/sync/mod.rs | 10 | ||||
| -rw-r--r-- | tests/sync/tag_variations.rs | 244 |
7 files changed, 457 insertions, 624 deletions
diff --git a/tests/sync/discovery.rs b/tests/sync/discovery.rs index 8ed80b5..d45a290 100644 --- a/tests/sync/discovery.rs +++ b/tests/sync/discovery.rs | |||
| @@ -3,10 +3,6 @@ | |||
| 3 | //! Tests for relay discovery from announcement events. | 3 | //! Tests for relay discovery from announcement events. |
| 4 | //! When a relay receives an announcement listing another relay, | 4 | //! When a relay receives an announcement listing another relay, |
| 5 | //! it should discover and connect to that relay to sync events. | 5 | //! it should discover and connect to that relay to sync events. |
| 6 | //! | ||
| 7 | //! # Tests | ||
| 8 | //! - Test 2: Direct Layer 3 discovery from Layer 2 | ||
| 9 | //! - Test 3: Recursive multi-hop Layer 3 discovery | ||
| 10 | 6 | ||
| 11 | use std::time::Duration; | 7 | use std::time::Duration; |
| 12 | 8 | ||
| @@ -62,29 +58,26 @@ async fn test_discovers_layer3_via_layer2() { | |||
| 62 | // 3. Create test keys | 58 | // 3. Create test keys |
| 63 | let keys = Keys::generate(); | 59 | let keys = Keys::generate(); |
| 64 | 60 | ||
| 65 | // 4. Create a repository announcement that lists BOTH relays | 61 | // 4. Set up repository announcement on relay_a with git data |
| 66 | let announcement = create_repo_announcement( | 62 | // (purgatory requires git data before announcements are accepted) |
| 67 | &keys, | 63 | let repo_id = "test-repo-discovery"; |
| 68 | &[&relay_a.domain(), &relay_b.domain()], | 64 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 69 | "test-repo-discovery", | 65 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 70 | ); | ||
| 71 | let announcement_id = announcement.id; | ||
| 72 | 66 | ||
| 67 | let (announcement, _git_dir_a) = | ||
| 68 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; | ||
| 69 | let announcement_id = announcement.id; | ||
| 73 | println!( | 70 | println!( |
| 74 | "Created announcement {} (kind {})", | 71 | "Announcement {} set up on relay_a with git data", |
| 75 | announcement_id, | 72 | announcement_id |
| 76 | announcement.kind.as_u16() | ||
| 77 | ); | 73 | ); |
| 78 | for tag in announcement.tags.iter() { | ||
| 79 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 80 | } | ||
| 81 | 74 | ||
| 82 | // 5. Build the repo coordinate for the 'a' tag in the patch | 75 | // 5. Build the repo coordinate for the 'a' tag in the patch |
| 83 | let repo_coord = format!( | 76 | let repo_coord = format!( |
| 84 | "{}:{}:{}", | 77 | "{}:{}:{}", |
| 85 | Kind::GitRepoAnnouncement.as_u16(), | 78 | Kind::GitRepoAnnouncement.as_u16(), |
| 86 | keys.public_key().to_hex(), | 79 | keys.public_key().to_hex(), |
| 87 | "test-repo-discovery" | 80 | repo_id |
| 88 | ); | 81 | ); |
| 89 | 82 | ||
| 90 | // 6. Create a patch event (Layer 2) that references the announcement | 83 | // 6. Create a patch event (Layer 2) that references the announcement |
| @@ -97,22 +90,13 @@ async fn test_discovers_layer3_via_layer2() { | |||
| 97 | let patch_id = patch.id; | 90 | let patch_id = patch.id; |
| 98 | 91 | ||
| 99 | println!("Created patch {} (kind {})", patch_id, patch.kind.as_u16()); | 92 | println!("Created patch {} (kind {})", patch_id, patch.kind.as_u16()); |
| 100 | for tag in patch.tags.iter() { | ||
| 101 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 102 | } | ||
| 103 | 93 | ||
| 104 | // 7. Send announcement and patch to relay_a ONLY | 94 | // 7. Send patch to relay_a |
| 105 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 95 | let client_a = TestClient::new(relay_a.url(), keys.clone()) |
| 106 | .await | 96 | .await |
| 107 | .expect("Failed to connect to relay_a"); | 97 | .expect("Failed to connect to relay_a"); |
| 108 | 98 | ||
| 109 | client_a | 99 | client_a |
| 110 | .send_event(&announcement) | ||
| 111 | .await | ||
| 112 | .expect("Failed to send announcement to relay_a"); | ||
| 113 | println!("Announcement sent to relay_a"); | ||
| 114 | |||
| 115 | client_a | ||
| 116 | .send_event(&patch) | 100 | .send_event(&patch) |
| 117 | .await | 101 | .await |
| 118 | .expect("Failed to send patch to relay_a"); | 102 | .expect("Failed to send patch to relay_a"); |
| @@ -120,18 +104,10 @@ async fn test_discovers_layer3_via_layer2() { | |||
| 120 | 104 | ||
| 121 | client_a.disconnect().await; | 105 | client_a.disconnect().await; |
| 122 | 106 | ||
| 123 | // 8. Send announcement to relay_b directly (triggers discovery of relay_a) | 107 | // 8. Set up announcement on relay_b (triggers discovery of relay_a) |
| 124 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 108 | let (_announcement_b, _git_dir_b) = |
| 125 | .await | 109 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 126 | .expect("Failed to connect to relay_b"); | 110 | println!("Announcement set up on relay_b (should trigger discovery of relay_a)"); |
| 127 | |||
| 128 | client_b | ||
| 129 | .send_event(&announcement) | ||
| 130 | .await | ||
| 131 | .expect("Failed to send announcement to relay_b"); | ||
| 132 | println!("Announcement sent to relay_b (should trigger discovery of relay_a)"); | ||
| 133 | |||
| 134 | client_b.disconnect().await; | ||
| 135 | 111 | ||
| 136 | // 9. Wait for relay_b to discover relay_a and sync the patch | 112 | // 9. Wait for relay_b to discover relay_a and sync the patch |
| 137 | println!("Waiting 3s for relay_b to discover relay_a and sync patch..."); | 113 | println!("Waiting 3s for relay_b to discover relay_a and sync patch..."); |
| @@ -197,19 +173,20 @@ async fn test_relay_discovery_via_announcements_with_historic_sync() { | |||
| 197 | // 3. Create test keys | 173 | // 3. Create test keys |
| 198 | let keys = Keys::generate(); | 174 | let keys = Keys::generate(); |
| 199 | 175 | ||
| 200 | // 4. Create the event chain on relay_a: | 176 | // 4. Set up repository on relay_a with git data and a Layer 2 issue |
| 201 | 177 | ||
| 202 | // Layer 1: Repository announcement | 178 | // Layer 1: Set up announcement with git data |
| 203 | let announcement = create_repo_announcement( | 179 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 204 | &keys, | 180 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 205 | &[&relay_a.domain(), &relay_b.domain()], | 181 | let repo_id = "test-repo-chain"; |
| 206 | "test-repo-chain", | 182 | |
| 207 | ); | 183 | let (announcement, _git_dir_a) = |
| 184 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; | ||
| 208 | let announcement_id = announcement.id; | 185 | let announcement_id = announcement.id; |
| 209 | println!("Created announcement {} (Layer 1)", announcement_id); | 186 | println!("Announcement {} set up on relay_a with git data (Layer 1)", announcement_id); |
| 210 | 187 | ||
| 211 | // Build repo coordinate for Layer 2 reference | 188 | // Build repo coordinate for Layer 2 reference |
| 212 | let repo_coord = repo_coord(&keys, "test-repo-chain"); | 189 | let repo_coord = repo_coord(&keys, repo_id); |
| 213 | 190 | ||
| 214 | // Layer 2: Issue referencing the repo | 191 | // Layer 2: Issue referencing the repo |
| 215 | let issue = build_layer2_issue_event(&keys, &repo_coord, "Test issue for chain discovery") | 192 | let issue = build_layer2_issue_event(&keys, &repo_coord, "Test issue for chain discovery") |
| @@ -217,35 +194,23 @@ async fn test_relay_discovery_via_announcements_with_historic_sync() { | |||
| 217 | let issue_id = issue.id; | 194 | let issue_id = issue.id; |
| 218 | println!("Created issue {} (Layer 2)", issue_id); | 195 | println!("Created issue {} (Layer 2)", issue_id); |
| 219 | 196 | ||
| 220 | // 5. Send all events to relay_a | 197 | // 5. Send issue to relay_a |
| 221 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 198 | let client_a = TestClient::new(relay_a.url(), keys.clone()) |
| 222 | .await | 199 | .await |
| 223 | .expect("Failed to connect to relay_a"); | 200 | .expect("Failed to connect to relay_a"); |
| 224 | 201 | ||
| 225 | client_a | 202 | client_a |
| 226 | .send_event(&announcement) | ||
| 227 | .await | ||
| 228 | .expect("Failed to send announcement"); | ||
| 229 | client_a | ||
| 230 | .send_event(&issue) | 203 | .send_event(&issue) |
| 231 | .await | 204 | .await |
| 232 | .expect("Failed to send issue"); | 205 | .expect("Failed to send issue"); |
| 233 | 206 | ||
| 234 | println!("Events sent to relay_a"); | 207 | println!("Issue sent to relay_a"); |
| 235 | client_a.disconnect().await; | 208 | client_a.disconnect().await; |
| 236 | 209 | ||
| 237 | // 6. Send only the announcement to relay_b (triggers discovery) | 210 | // 6. Set up announcement on relay_b (triggers discovery of relay_a) |
| 238 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 211 | let (_announcement_b, _git_dir_b) = |
| 239 | .await | 212 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 240 | .expect("Failed to connect to relay_b"); | 213 | println!("Announcement set up on relay_b (should trigger discovery of relay_a)"); |
| 241 | |||
| 242 | client_b | ||
| 243 | .send_event(&announcement) | ||
| 244 | .await | ||
| 245 | .expect("Failed to send announcement to relay_b"); | ||
| 246 | println!("Announcement sent to relay_b (should trigger discovery)"); | ||
| 247 | |||
| 248 | client_b.disconnect().await; | ||
| 249 | 214 | ||
| 250 | // 7. Wait for sync | 215 | // 7. Wait for sync |
| 251 | println!("Waiting 3s for Layer 2 sync..."); | 216 | println!("Waiting 3s for Layer 2 sync..."); |
| @@ -271,163 +236,3 @@ async fn test_relay_discovery_via_announcements_with_historic_sync() { | |||
| 271 | ); | 236 | ); |
| 272 | } | 237 | } |
| 273 | 238 | ||
| 274 | /// Test 3: 3-relay recursive discovery - relay discovers third relay through bootstrap | ||
| 275 | /// | ||
| 276 | /// Scenario: | ||
| 277 | /// ```text | ||
| 278 | /// relay_a (SUT) relay_b (bootstrap) relay_c (discovered) | ||
| 279 | /// │ │ │ | ||
| 280 | /// │ │ has announcement_x │ has announcement_y | ||
| 281 | /// │ │ listing A+B+C │ listing A+C | ||
| 282 | /// │ │ │ | ||
| 283 | /// ├────connect──────────► │ | ||
| 284 | /// │◄───sync announcement_x─────────────────────── | ||
| 285 | /// │ │ | ||
| 286 | /// │ discovers relay_c from announcement_x │ | ||
| 287 | /// │ │ | ||
| 288 | /// ├─────────────connect─────────────────────────► | ||
| 289 | /// │◄────────────sync announcement_y─────────────┘ | ||
| 290 | /// ``` | ||
| 291 | /// | ||
| 292 | /// This tests that relay_a: | ||
| 293 | /// 1. Connects to relay_b (configured as bootstrap) | ||
| 294 | /// 2. Receives announcement_x which lists relay_c | ||
| 295 | /// 3. Discovers and connects to relay_c | ||
| 296 | /// 4. Syncs announcement_y from relay_c | ||
| 297 | /// | ||
| 298 | #[tokio::test] | ||
| 299 | async fn test_recursive_relay_discovery_via_announcements_with_historic_sync() { | ||
| 300 | // 1. Start all three relays | ||
| 301 | |||
| 302 | // relay_b - will be the bootstrap relay, has announcement_x | ||
| 303 | let relay_b = TestRelay::start().await; | ||
| 304 | println!( | ||
| 305 | "relay_b (bootstrap) started at {} (domain: {})", | ||
| 306 | relay_b.url(), | ||
| 307 | relay_b.domain() | ||
| 308 | ); | ||
| 309 | |||
| 310 | // relay_c - will be discovered via announcement_x, has announcement_y | ||
| 311 | let relay_c = TestRelay::start().await; | ||
| 312 | println!( | ||
| 313 | "relay_c (to be discovered) started at {} (domain: {})", | ||
| 314 | relay_c.url(), | ||
| 315 | relay_c.domain() | ||
| 316 | ); | ||
| 317 | |||
| 318 | // relay_a - SUT, starts with relay_b as bootstrap | ||
| 319 | let relay_a = TestRelay::start_with_sync(Some(relay_b.url().to_string())).await; | ||
| 320 | println!( | ||
| 321 | "relay_a (SUT) started at {} (domain: {})", | ||
| 322 | relay_a.url(), | ||
| 323 | relay_a.domain() | ||
| 324 | ); | ||
| 325 | |||
| 326 | // 2. Create test keys (one for each announcement) | ||
| 327 | let keys_x = Keys::generate(); | ||
| 328 | let keys_y = Keys::generate(); | ||
| 329 | |||
| 330 | // 3. Create announcement_x on relay_b (lists all three relays: A+B+C) | ||
| 331 | let announcement_x = create_repo_announcement( | ||
| 332 | &keys_x, | ||
| 333 | &[&relay_a.domain(), &relay_b.domain(), &relay_c.domain()], | ||
| 334 | "repo-x-all-relays", | ||
| 335 | ); | ||
| 336 | let announcement_x_id = announcement_x.id; | ||
| 337 | println!("Created announcement_x {} listing A+B+C", announcement_x_id); | ||
| 338 | for tag in announcement_x.tags.iter() { | ||
| 339 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 340 | } | ||
| 341 | |||
| 342 | // 4. Create announcement_y on relay_c (lists only A+C, NOT B) | ||
| 343 | let announcement_y = create_repo_announcement( | ||
| 344 | &keys_y, | ||
| 345 | &[&relay_a.domain(), &relay_c.domain()], | ||
| 346 | "repo-y-ac-only", | ||
| 347 | ); | ||
| 348 | let announcement_y_id = announcement_y.id; | ||
| 349 | println!( | ||
| 350 | "Created announcement_y {} listing A+C only", | ||
| 351 | announcement_y_id | ||
| 352 | ); | ||
| 353 | for tag in announcement_y.tags.iter() { | ||
| 354 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 355 | } | ||
| 356 | |||
| 357 | // 5. Send announcement_x to relay_b only | ||
| 358 | let client_b = TestClient::new(relay_b.url(), keys_x.clone()) | ||
| 359 | .await | ||
| 360 | .expect("Failed to connect to relay_b"); | ||
| 361 | |||
| 362 | client_b | ||
| 363 | .send_event(&announcement_x) | ||
| 364 | .await | ||
| 365 | .expect("Failed to send announcement_x to relay_b"); | ||
| 366 | println!("announcement_x sent to relay_b"); | ||
| 367 | |||
| 368 | client_b.disconnect().await; | ||
| 369 | |||
| 370 | // 6. Send announcement_y to relay_c only | ||
| 371 | let client_c = TestClient::new(relay_c.url(), keys_y.clone()) | ||
| 372 | .await | ||
| 373 | .expect("Failed to connect to relay_c"); | ||
| 374 | |||
| 375 | client_c | ||
| 376 | .send_event(&announcement_y) | ||
| 377 | .await | ||
| 378 | .expect("Failed to send announcement_y to relay_c"); | ||
| 379 | println!("announcement_y sent to relay_c"); | ||
| 380 | |||
| 381 | client_c.disconnect().await; | ||
| 382 | |||
| 383 | // 7. Wait for relay_a to: | ||
| 384 | // - Sync from bootstrap relay_b (gets announcement_x) | ||
| 385 | // - Discover relay_c from announcement_x's relays tag | ||
| 386 | // - Connect to relay_c and sync announcement_y | ||
| 387 | println!("Waiting 5s for recursive relay discovery..."); | ||
| 388 | tokio::time::sleep(Duration::from_secs(5)).await; | ||
| 389 | |||
| 390 | // 8. Verify announcement_x was synced to relay_a (from bootstrap relay_b) | ||
| 391 | let filter_x = Filter::new() | ||
| 392 | .kind(Kind::GitRepoAnnouncement) | ||
| 393 | .author(keys_x.public_key()); | ||
| 394 | |||
| 395 | let announcement_x_synced = | ||
| 396 | wait_for_event_on_relay(relay_a.url(), filter_x, Duration::from_secs(5)).await; | ||
| 397 | |||
| 398 | println!( | ||
| 399 | "announcement_x {} synced to relay_a: {}", | ||
| 400 | announcement_x_id, announcement_x_synced | ||
| 401 | ); | ||
| 402 | |||
| 403 | // 9. Verify announcement_y was synced to relay_a (from discovered relay_c) | ||
| 404 | let filter_y = Filter::new() | ||
| 405 | .kind(Kind::GitRepoAnnouncement) | ||
| 406 | .author(keys_y.public_key()); | ||
| 407 | |||
| 408 | let announcement_y_synced = | ||
| 409 | wait_for_event_on_relay(relay_a.url(), filter_y, Duration::from_secs(5)).await; | ||
| 410 | |||
| 411 | println!( | ||
| 412 | "announcement_y {} synced to relay_a: {}", | ||
| 413 | announcement_y_id, announcement_y_synced | ||
| 414 | ); | ||
| 415 | |||
| 416 | // 10. Cleanup | ||
| 417 | relay_a.stop().await; | ||
| 418 | relay_b.stop().await; | ||
| 419 | relay_c.stop().await; | ||
| 420 | |||
| 421 | // 11. Assertions | ||
| 422 | assert!( | ||
| 423 | announcement_x_synced, | ||
| 424 | "announcement_x {} should have synced from bootstrap relay_b to relay_a", | ||
| 425 | announcement_x_id | ||
| 426 | ); | ||
| 427 | |||
| 428 | assert!( | ||
| 429 | announcement_y_synced, | ||
| 430 | "announcement_y {} should have synced from discovered relay_c to relay_a (recursive discovery)", | ||
| 431 | announcement_y_id | ||
| 432 | ); | ||
| 433 | } | ||
diff --git a/tests/sync/historic_sync.rs b/tests/sync/historic_sync.rs index aec2819..723b776 100644 --- a/tests/sync/historic_sync.rs +++ b/tests/sync/historic_sync.rs | |||
| @@ -224,34 +224,24 @@ async fn test_history_sync_without_negentropy() { | |||
| 224 | // Create keys | 224 | // Create keys |
| 225 | let keys = Keys::generate(); | 225 | let keys = Keys::generate(); |
| 226 | 226 | ||
| 227 | // Create announcement listing BOTH relay domains | 227 | // Set up announcement on source with git data |
| 228 | // This event will exist on source BEFORE syncing relay ever connects | 228 | // (purgatory requires git data before announcements are accepted) |
| 229 | let announcement = create_repo_announcement( | 229 | let domains = vec![source.domain(), syncing_domain.clone()]; |
| 230 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); | ||
| 231 | let (announcement, _git_dir) = setup_announcement_on_relay( | ||
| 232 | &source, | ||
| 230 | &keys, | 233 | &keys, |
| 231 | &[&source.domain(), &syncing_domain], | 234 | &domain_refs, |
| 232 | "test-repo-history-no-negentropy", | 235 | "test-repo-history-no-negentropy", |
| 233 | ); | 236 | ) |
| 237 | .await; | ||
| 234 | let announcement_id = announcement.id; | 238 | let announcement_id = announcement.id; |
| 235 | 239 | ||
| 236 | println!( | 240 | println!( |
| 237 | "Created announcement {} (kind {})", | 241 | "Announcement {} set up on source with git data (event exists BEFORE syncing relay connects)", |
| 238 | announcement_id, | 242 | announcement_id |
| 239 | announcement.kind.as_u16() | ||
| 240 | ); | 243 | ); |
| 241 | 244 | ||
| 242 | // Send announcement to source (event now exists BEFORE syncing relay connects) | ||
| 243 | let client = TestClient::new(source.url(), keys.clone()) | ||
| 244 | .await | ||
| 245 | .expect("Failed to connect to source"); | ||
| 246 | |||
| 247 | client | ||
| 248 | .send_event(&announcement) | ||
| 249 | .await | ||
| 250 | .expect("Failed to send announcement to source"); | ||
| 251 | println!("Announcement sent to source (event exists BEFORE syncing relay connects)"); | ||
| 252 | |||
| 253 | client.disconnect().await; | ||
| 254 | |||
| 255 | // Wait to ensure event is stored | 245 | // Wait to ensure event is stored |
| 256 | tokio::time::sleep(Duration::from_millis(500)).await; | 246 | tokio::time::sleep(Duration::from_millis(500)).await; |
| 257 | 247 | ||
diff --git a/tests/sync/live_sync.rs b/tests/sync/live_sync.rs index 8ee3119..4289004 100644 --- a/tests/sync/live_sync.rs +++ b/tests/sync/live_sync.rs | |||
| @@ -56,43 +56,24 @@ async fn test_live_sync_layer2_events() { | |||
| 56 | // 3. Create test keys | 56 | // 3. Create test keys |
| 57 | let keys = Keys::generate(); | 57 | let keys = Keys::generate(); |
| 58 | 58 | ||
| 59 | // 4. Create a repository announcement that lists BOTH relays | 59 | // 4. Create a repository announcement on both relays with git data |
| 60 | // (purgatory requires git data before announcements are accepted) | ||
| 60 | let repo_id = "test-repo-live-l2"; | 61 | let repo_id = "test-repo-live-l2"; |
| 61 | let announcement = | 62 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 62 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 63 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 63 | 64 | ||
| 64 | println!( | 65 | let (_announcement, _git_dir_a) = |
| 65 | "Created announcement {} (kind {})", | 66 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 66 | announcement.id, | 67 | println!("Announcement set up on relay_a with git data"); |
| 67 | announcement.kind.as_u16() | ||
| 68 | ); | ||
| 69 | |||
| 70 | // 5. Send announcement to relay_a | ||
| 71 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 72 | .await | ||
| 73 | .expect("Failed to connect to relay_a"); | ||
| 74 | |||
| 75 | client_a | ||
| 76 | .send_event(&announcement) | ||
| 77 | .await | ||
| 78 | .expect("Failed to send announcement to relay_a"); | ||
| 79 | println!("Announcement sent to relay_a"); | ||
| 80 | |||
| 81 | // 6. Send announcement to relay_b (triggers discovery of relay_a) | ||
| 82 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | ||
| 83 | .await | ||
| 84 | .expect("Failed to connect to relay_b"); | ||
| 85 | 68 | ||
| 86 | client_b | 69 | let (_announcement_b, _git_dir_b) = |
| 87 | .send_event(&announcement) | 70 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 88 | .await | 71 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 89 | .expect("Failed to send announcement to relay_b"); | ||
| 90 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 91 | 72 | ||
| 92 | // 7. Wait for discovery to complete | 73 | // 5. Wait for discovery to complete |
| 93 | tokio::time::sleep(Duration::from_secs(1)).await; | 74 | tokio::time::sleep(Duration::from_secs(1)).await; |
| 94 | 75 | ||
| 95 | // 8. Create and send a Layer 2 issue event (using helper) | 76 | // 6. Create and send a Layer 2 issue event (using helper) |
| 96 | let repo_coordinate = repo_coord(&keys, repo_id); | 77 | let repo_coordinate = repo_coord(&keys, repo_id); |
| 97 | let issue = build_layer2_issue_event(&keys, &repo_coordinate, "Test Issue for Live Sync") | 78 | let issue = build_layer2_issue_event(&keys, &repo_coordinate, "Test Issue for Live Sync") |
| 98 | .expect("Failed to create issue event"); | 79 | .expect("Failed to create issue event"); |
| @@ -104,6 +85,10 @@ async fn test_live_sync_layer2_events() { | |||
| 104 | } | 85 | } |
| 105 | 86 | ||
| 106 | // Send issue to relay_a only | 87 | // Send issue to relay_a only |
| 88 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 89 | .await | ||
| 90 | .expect("Failed to connect to relay_a"); | ||
| 91 | |||
| 107 | client_a | 92 | client_a |
| 108 | .send_event(&issue) | 93 | .send_event(&issue) |
| 109 | .await | 94 | .await |
| @@ -111,7 +96,6 @@ async fn test_live_sync_layer2_events() { | |||
| 111 | println!("Issue sent to relay_a"); | 96 | println!("Issue sent to relay_a"); |
| 112 | 97 | ||
| 113 | client_a.disconnect().await; | 98 | client_a.disconnect().await; |
| 114 | client_b.disconnect().await; | ||
| 115 | 99 | ||
| 116 | // 9. Wait and verify event syncs to relay_b | 100 | // 9. Wait and verify event syncs to relay_b |
| 117 | let filter = Filter::new() | 101 | let filter = Filter::new() |
| @@ -166,30 +150,19 @@ async fn test_live_sync_layer3_events() { | |||
| 166 | 150 | ||
| 167 | let keys = Keys::generate(); | 151 | let keys = Keys::generate(); |
| 168 | 152 | ||
| 169 | // 2. Create and send repository announcement to both relays | 153 | // 2. Create and send repository announcement to both relays with git data |
| 154 | // (purgatory requires git data before announcements are accepted) | ||
| 170 | let repo_id = "test-repo-live-l3"; | 155 | let repo_id = "test-repo-live-l3"; |
| 171 | let announcement = | 156 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 172 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 157 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 173 | 158 | ||
| 174 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 159 | let (_announcement, _git_dir_a) = |
| 175 | .await | 160 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 176 | .expect("Failed to connect to relay_a"); | 161 | println!("Announcement set up on relay_a with git data"); |
| 177 | 162 | ||
| 178 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 163 | let (_announcement_b, _git_dir_b) = |
| 179 | .await | 164 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 180 | .expect("Failed to connect to relay_b"); | 165 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 181 | |||
| 182 | client_a | ||
| 183 | .send_event(&announcement) | ||
| 184 | .await | ||
| 185 | .expect("Failed to send announcement to relay_a"); | ||
| 186 | println!("Announcement sent to relay_a"); | ||
| 187 | |||
| 188 | client_b | ||
| 189 | .send_event(&announcement) | ||
| 190 | .await | ||
| 191 | .expect("Failed to send announcement to relay_b"); | ||
| 192 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 193 | 166 | ||
| 194 | // 3. Wait for discovery | 167 | // 3. Wait for discovery |
| 195 | tokio::time::sleep(Duration::from_secs(1)).await; | 168 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -200,6 +173,10 @@ async fn test_live_sync_layer3_events() { | |||
| 200 | .expect("Failed to create issue"); | 173 | .expect("Failed to create issue"); |
| 201 | let issue_id = issue.id; | 174 | let issue_id = issue.id; |
| 202 | 175 | ||
| 176 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 177 | .await | ||
| 178 | .expect("Failed to connect to relay_a"); | ||
| 179 | |||
| 203 | client_a | 180 | client_a |
| 204 | .send_event(&issue) | 181 | .send_event(&issue) |
| 205 | .await | 182 | .await |
| @@ -243,7 +220,6 @@ async fn test_live_sync_layer3_events() { | |||
| 243 | println!("Issue synced to relay_b: {}", issue_synced); | 220 | println!("Issue synced to relay_b: {}", issue_synced); |
| 244 | 221 | ||
| 245 | client_a.disconnect().await; | 222 | client_a.disconnect().await; |
| 246 | client_b.disconnect().await; | ||
| 247 | 223 | ||
| 248 | // 7. Wait and verify comment syncs to relay_b | 224 | // 7. Wait and verify comment syncs to relay_b |
| 249 | let comment_filter = Filter::new() | 225 | let comment_filter = Filter::new() |
| @@ -343,29 +319,17 @@ async fn test_live_sync_event_ordering() { | |||
| 343 | 319 | ||
| 344 | let keys = Keys::generate(); | 320 | let keys = Keys::generate(); |
| 345 | 321 | ||
| 346 | // 2. Create and send repository announcement to both relays | 322 | // 2. Create and send repository announcement to both relays with git data |
| 323 | // (purgatory requires git data before announcements are accepted) | ||
| 347 | let repo_id = "test-repo-ordering"; | 324 | let repo_id = "test-repo-ordering"; |
| 348 | let announcement = | 325 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 349 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 326 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 350 | 327 | ||
| 351 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 328 | let (_announcement, _git_dir_a) = |
| 352 | .await | 329 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 353 | .expect("Failed to connect to relay_a"); | 330 | let (_announcement_b, _git_dir_b) = |
| 354 | 331 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; | |
| 355 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 332 | println!("Announcements set up on both relays with git data"); |
| 356 | .await | ||
| 357 | .expect("Failed to connect to relay_b"); | ||
| 358 | |||
| 359 | client_a | ||
| 360 | .send_event(&announcement) | ||
| 361 | .await | ||
| 362 | .expect("Failed to send announcement to relay_a"); | ||
| 363 | |||
| 364 | client_b | ||
| 365 | .send_event(&announcement) | ||
| 366 | .await | ||
| 367 | .expect("Failed to send announcement to relay_b"); | ||
| 368 | println!("Announcements sent to both relays"); | ||
| 369 | 333 | ||
| 370 | // 3. Wait for discovery | 334 | // 3. Wait for discovery |
| 371 | tokio::time::sleep(Duration::from_secs(1)).await; | 335 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -375,6 +339,10 @@ async fn test_live_sync_event_ordering() { | |||
| 375 | let mut issue_ids = Vec::new(); | 339 | let mut issue_ids = Vec::new(); |
| 376 | let mut expected_order_timestamps = Vec::new(); | 340 | let mut expected_order_timestamps = Vec::new(); |
| 377 | 341 | ||
| 342 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 343 | .await | ||
| 344 | .expect("Failed to connect to relay_a"); | ||
| 345 | |||
| 378 | for i in 1..=3 { | 346 | for i in 1..=3 { |
| 379 | let issue = build_layer2_issue_event( | 347 | let issue = build_layer2_issue_event( |
| 380 | &keys, | 348 | &keys, |
| @@ -402,7 +370,6 @@ async fn test_live_sync_event_ordering() { | |||
| 402 | } | 370 | } |
| 403 | 371 | ||
| 404 | client_a.disconnect().await; | 372 | client_a.disconnect().await; |
| 405 | client_b.disconnect().await; | ||
| 406 | 373 | ||
| 407 | // 5. Wait for all events to sync | 374 | // 5. Wait for all events to sync |
| 408 | tokio::time::sleep(Duration::from_secs(3)).await; | 375 | tokio::time::sleep(Duration::from_secs(3)).await; |
diff --git a/tests/sync/maintainer_reprocessing.rs b/tests/sync/maintainer_reprocessing.rs index df1bf78..ff1eb43 100644 --- a/tests/sync/maintainer_reprocessing.rs +++ b/tests/sync/maintainer_reprocessing.rs | |||
| @@ -2,6 +2,25 @@ | |||
| 2 | //! | 2 | //! |
| 3 | //! Tests the two-tier rejected events index and immediate re-processing of | 3 | //! Tests the two-tier rejected events index and immediate re-processing of |
| 4 | //! maintainer announcements when owner announcements are accepted. | 4 | //! maintainer announcements when owner announcements are accepted. |
| 5 | //! | ||
| 6 | //! ## Test design | ||
| 7 | //! | ||
| 8 | //! Announcements now require git data before they are released from purgatory and | ||
| 9 | //! served to other relays. The hot-cache re-processing path we want to exercise is: | ||
| 10 | //! | ||
| 11 | //! relay_b syncs maintainer announcement from relay_a | ||
| 12 | //! → write policy rejects it (no owner announcement in DB yet) | ||
| 13 | //! → event stored in hot cache | ||
| 14 | //! owner git push to relay_b promotes owner announcement from purgatory | ||
| 15 | //! → our new code calls rejected_events_index.invalidate_and_get() | ||
| 16 | //! → maintainer announcement re-processed and accepted | ||
| 17 | //! | ||
| 18 | //! To guarantee the maintainer announcements arrive at relay_b *before* the owner | ||
| 19 | //! git push, relay_b is started with relay_a as its bootstrap relay. That way | ||
| 20 | //! relay_b's SyncManager connects to relay_a immediately and syncs whatever is | ||
| 21 | //! already in relay_a's DB. We push the maintainer git data first (so the | ||
| 22 | //! announcements are in relay_a's DB), wait briefly for the sync round-trip, then | ||
| 23 | //! send the owner announcement + git push. | ||
| 5 | 24 | ||
| 6 | use std::time::Duration; | 25 | use std::time::Duration; |
| 7 | 26 | ||
| @@ -9,66 +28,91 @@ use nostr_sdk::prelude::*; | |||
| 9 | 28 | ||
| 10 | use crate::common::{sync_helpers::*, TestRelay}; | 29 | use crate::common::{sync_helpers::*, TestRelay}; |
| 11 | 30 | ||
| 12 | /// Test that maintainer announcements are re-processed immediately when owner announcement accepted | 31 | /// Test that a maintainer announcement is re-processed immediately when the owner |
| 32 | /// announcement is promoted from purgatory via a git push. | ||
| 13 | /// | 33 | /// |
| 14 | /// Flow: | 34 | /// Flow: |
| 15 | /// 1. relay_a: Maintainer sends announcement (gets rejected - doesn't list relay_b) | 35 | /// 1. relay_a: Maintainer sends announcement + git data → accepted into relay_a's DB |
| 16 | /// 2. relay_b: Owner sends announcement (lists relay_a + maintainer) | 36 | /// 2. relay_b (bootstrapped from relay_a): SyncManager syncs maintainer announcement |
| 17 | /// 3. relay_b syncs from relay_a, maintainer announcement enters rejected index | 37 | /// → rejected by write policy (no owner in DB) → stored in hot cache |
| 18 | /// 4. relay_b processes owner announcement, invalidates and re-processes maintainer announcement | 38 | /// 3. relay_b: Owner sends announcement → purgatory (no git data yet) |
| 39 | /// 4. relay_b: Owner git push → owner announcement promoted from purgatory | ||
| 40 | /// → hot-cache re-processing fires → maintainer announcement accepted | ||
| 19 | /// 5. Both announcements should be in relay_b's database | 41 | /// 5. Both announcements should be in relay_b's database |
| 20 | /// | ||
| 21 | /// Expected time: <5 seconds (vs 24 hours without hot cache) | ||
| 22 | #[tokio::test] | 42 | #[tokio::test] |
| 23 | async fn test_maintainer_announcement_reprocessed_immediately() { | 43 | async fn test_maintainer_announcement_reprocessed_immediately() { |
| 24 | // Start relay_a (where maintainer announcement will be sent) | 44 | // Start relay_a (where maintainer announcement will be sent) |
| 25 | let relay_a = TestRelay::start().await; | 45 | let relay_a = TestRelay::start().await; |
| 26 | println!("relay_a started at {}", relay_a.url()); | 46 | println!("relay_a started at {}", relay_a.url()); |
| 27 | 47 | ||
| 28 | // Start relay_b with sync enabled (will sync from relay_a) | ||
| 29 | let relay_b = TestRelay::start_with_sync(None).await; | ||
| 30 | println!("relay_b started at {}", relay_b.url()); | ||
| 31 | |||
| 32 | // Create keys | 48 | // Create keys |
| 33 | let owner_keys = Keys::generate(); | 49 | let owner_keys = Keys::generate(); |
| 34 | let maintainer_keys = Keys::generate(); | 50 | let maintainer_keys = Keys::generate(); |
| 35 | |||
| 36 | let identifier = "test-repo"; | 51 | let identifier = "test-repo"; |
| 37 | 52 | ||
| 38 | let start = std::time::Instant::now(); | 53 | // Step 1: Send maintainer announcement to relay_a then push git data so it lands in |
| 39 | 54 | // relay_a's DB. The announcement lists relay_a only (not relay_b), so relay_b's write | |
| 40 | // Step 1: Send maintainer announcement to relay_a (will be rejected - doesn't list relay_b) | 55 | // policy will reject it when it arrives via sync. |
| 41 | let client_a = TestClient::new(relay_a.url(), maintainer_keys.clone()) | 56 | let maintainer_npub = maintainer_keys |
| 42 | .await | 57 | .public_key() |
| 43 | .expect("Failed to connect to relay_a"); | 58 | .to_bech32() |
| 44 | 59 | .expect("Failed to get npub"); | |
| 45 | let maintainer_announcement = | 60 | let maintainer_announcement = |
| 46 | EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") | 61 | EventBuilder::new(Kind::GitRepoAnnouncement, "Maintainer's repository") |
| 47 | .tags(vec![ | 62 | .tags(vec![ |
| 48 | Tag::identifier(identifier), | 63 | Tag::identifier(identifier), |
| 49 | Tag::custom( | 64 | Tag::custom( |
| 50 | TagKind::custom("clone"), | 65 | TagKind::custom("clone"), |
| 51 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], | 66 | vec![format!( |
| 67 | "http://{}/{}/{}.git", | ||
| 68 | relay_a.domain(), | ||
| 69 | maintainer_npub, | ||
| 70 | identifier | ||
| 71 | )], | ||
| 72 | ), | ||
| 73 | Tag::custom( | ||
| 74 | TagKind::custom("relays"), | ||
| 75 | vec![relay_a.url().to_string()], | ||
| 52 | ), | 76 | ), |
| 53 | Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), | ||
| 54 | ]) | 77 | ]) |
| 55 | .sign_with_keys(&maintainer_keys) | 78 | .sign_with_keys(&maintainer_keys) |
| 56 | .unwrap(); | 79 | .unwrap(); |
| 80 | send_to_relay(&relay_a, &maintainer_announcement).await.unwrap(); | ||
| 81 | let _git_dir_maintainer = | ||
| 82 | push_git_data_to_relay(&relay_a, &maintainer_keys, identifier, &[&relay_a.domain()]) | ||
| 83 | .await; | ||
| 84 | println!("✓ Maintainer announcement + git data pushed to relay_a"); | ||
| 85 | |||
| 86 | // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. | ||
| 87 | // relay_b's initial negentropy sync will pick up the maintainer announcement and reject it | ||
| 88 | // (no owner announcement in relay_b's DB yet), storing it in the hot cache. | ||
| 89 | let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; | ||
| 90 | println!("relay_b started at {}", relay_b.url()); | ||
| 57 | 91 | ||
| 58 | client_a.send_event(&maintainer_announcement).await.unwrap(); | 92 | // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. |
| 59 | println!("✓ Maintainer announcement sent to relay_a"); | 93 | tokio::time::sleep(Duration::from_secs(3)).await; |
| 94 | println!("✓ relay_b synced from relay_a (maintainer announcement should be in hot cache)"); | ||
| 60 | 95 | ||
| 61 | // Step 2: Send owner announcement to relay_b (lists relay_a + maintainer) | 96 | let start = std::time::Instant::now(); |
| 62 | let client_b = TestClient::new(relay_b.url(), owner_keys.clone()) | 97 | |
| 63 | .await | 98 | // Step 3: Send owner announcement to relay_b → goes to purgatory (no git data yet). |
| 64 | .expect("Failed to connect to relay_b"); | 99 | // The announcement lists relay_a + relay_b and names the maintainer. |
| 100 | let owner_npub = owner_keys | ||
| 101 | .public_key() | ||
| 102 | .to_bech32() | ||
| 103 | .expect("Failed to get npub"); | ||
| 65 | 104 | ||
| 66 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") | 105 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 67 | .tags(vec![ | 106 | .tags(vec![ |
| 68 | Tag::identifier(identifier), | 107 | Tag::identifier(identifier), |
| 69 | Tag::custom( | 108 | Tag::custom( |
| 70 | TagKind::custom("clone"), | 109 | TagKind::custom("clone"), |
| 71 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], | 110 | vec![format!( |
| 111 | "http://{}/{}/{}.git", | ||
| 112 | relay_b.domain(), | ||
| 113 | owner_npub, | ||
| 114 | identifier | ||
| 115 | )], | ||
| 72 | ), | 116 | ), |
| 73 | Tag::custom( | 117 | Tag::custom( |
| 74 | TagKind::custom("relays"), | 118 | TagKind::custom("relays"), |
| @@ -82,15 +126,22 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 82 | .sign_with_keys(&owner_keys) | 126 | .sign_with_keys(&owner_keys) |
| 83 | .unwrap(); | 127 | .unwrap(); |
| 84 | 128 | ||
| 85 | client_b.send_event(&owner_announcement).await.unwrap(); | 129 | send_to_relay(&relay_b, &owner_announcement).await.unwrap(); |
| 86 | println!("✓ Owner announcement sent to relay_b"); | 130 | println!("✓ Owner announcement sent to relay_b (now in purgatory)"); |
| 87 | 131 | ||
| 88 | // Step 3: Wait for sync and re-processing (relay_b discovers relay_a, syncs, re-processes) | 132 | // Step 4: Push owner git data to relay_b. |
| 89 | tokio::time::sleep(Duration::from_secs(3)).await; | 133 | // This promotes the owner announcement from purgatory, which triggers hot-cache |
| 134 | // re-processing of the maintainer announcement via our new code path. | ||
| 135 | let _git_dir_owner = | ||
| 136 | push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; | ||
| 137 | println!("✓ Owner git data pushed to relay_b (owner announcement promoted, hot cache re-processed)"); | ||
| 138 | |||
| 139 | // Step 5: Wait briefly for async processing to complete. | ||
| 140 | tokio::time::sleep(Duration::from_secs(1)).await; | ||
| 90 | 141 | ||
| 91 | let elapsed = start.elapsed(); | 142 | let elapsed = start.elapsed(); |
| 92 | 143 | ||
| 93 | // Step 4: Verify both announcements are in relay_b's database | 144 | // Step 6: Verify both announcements are in relay_b's database. |
| 94 | let owner_filter = Filter::new() | 145 | let owner_filter = Filter::new() |
| 95 | .kind(Kind::GitRepoAnnouncement) | 146 | .kind(Kind::GitRepoAnnouncement) |
| 96 | .author(owner_keys.public_key()) | 147 | .author(owner_keys.public_key()) |
| @@ -112,17 +163,14 @@ async fn test_maintainer_announcement_reprocessed_immediately() { | |||
| 112 | "Maintainer announcement should be re-processed and accepted in relay_b" | 163 | "Maintainer announcement should be re-processed and accepted in relay_b" |
| 113 | ); | 164 | ); |
| 114 | 165 | ||
| 115 | // Step 5: Verify it happened quickly (not 24 hours!) | ||
| 116 | assert!( | 166 | assert!( |
| 117 | elapsed.as_secs() < 10, | 167 | elapsed.as_secs() < 15, |
| 118 | "Re-processing should happen in <10 seconds, took {:?}", | 168 | "Re-processing should happen in <15 seconds, took {:?}", |
| 119 | elapsed | 169 | elapsed |
| 120 | ); | 170 | ); |
| 121 | 171 | ||
| 122 | println!("✅ Maintainer announcement re-processed in {:?}", elapsed); | 172 | println!("✅ Maintainer announcement re-processed in {:?}", elapsed); |
| 123 | 173 | ||
| 124 | client_a.disconnect().await; | ||
| 125 | client_b.disconnect().await; | ||
| 126 | relay_a.stop().await; | 174 | relay_a.stop().await; |
| 127 | relay_b.stop().await; | 175 | relay_b.stop().await; |
| 128 | } | 176 | } |
| @@ -227,13 +275,16 @@ async fn test_maintainer_announcement_cold_index_prevents_refetch() { | |||
| 227 | relay.stop().await; | 275 | relay.stop().await; |
| 228 | } | 276 | } |
| 229 | 277 | ||
| 230 | /// Test multiple maintainers are all re-processed when owner announcement accepted | 278 | /// Test that all maintainer announcements are re-processed when the owner announcement |
| 279 | /// is promoted from purgatory via a git push. | ||
| 231 | /// | 280 | /// |
| 232 | /// Flow: | 281 | /// Flow: |
| 233 | /// 1. relay_a: Three maintainers send announcements (get rejected - don't list relay_b) | 282 | /// 1. relay_a: Three maintainers send announcements + git data → in relay_a's DB |
| 234 | /// 2. relay_b: Owner sends announcement (lists relay_a + all three maintainers) | 283 | /// 2. relay_b (bootstrapped from relay_a): SyncManager syncs all three maintainer |
| 235 | /// 3. relay_b syncs from relay_a, all maintainer announcements enter rejected index | 284 | /// announcements → all rejected (no owner in DB) → all in hot cache |
| 236 | /// 4. relay_b processes owner announcement, invalidates and re-processes all maintainer announcements | 285 | /// 3. relay_b: Owner sends announcement → purgatory |
| 286 | /// 4. relay_b: Owner git push → owner promoted → hot-cache re-processing fires for | ||
| 287 | /// all three maintainers | ||
| 237 | /// 5. All four announcements should be in relay_b's database | 288 | /// 5. All four announcements should be in relay_b's database |
| 238 | #[tokio::test] | 289 | #[tokio::test] |
| 239 | async fn test_multiple_maintainers_all_reprocessed() { | 290 | async fn test_multiple_maintainers_all_reprocessed() { |
| @@ -241,57 +292,113 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 241 | let relay_a = TestRelay::start().await; | 292 | let relay_a = TestRelay::start().await; |
| 242 | println!("relay_a started at {}", relay_a.url()); | 293 | println!("relay_a started at {}", relay_a.url()); |
| 243 | 294 | ||
| 244 | // Start relay_b with sync enabled (will sync from relay_a) | ||
| 245 | let relay_b = TestRelay::start_with_sync(None).await; | ||
| 246 | println!("relay_b started at {}", relay_b.url()); | ||
| 247 | |||
| 248 | // Create keys | 295 | // Create keys |
| 249 | let owner_keys = Keys::generate(); | 296 | let owner_keys = Keys::generate(); |
| 250 | let maintainer1_keys = Keys::generate(); | 297 | let maintainer1_keys = Keys::generate(); |
| 251 | let maintainer2_keys = Keys::generate(); | 298 | let maintainer2_keys = Keys::generate(); |
| 252 | let maintainer3_keys = Keys::generate(); | 299 | let maintainer3_keys = Keys::generate(); |
| 253 | 300 | ||
| 254 | let identifier = "multi-maintainer-repo"; | 301 | // Use a unique identifier per test run to avoid cross-test interference when |
| 255 | 302 | // tests run in parallel (each test gets its own namespace on relay_a). | |
| 256 | // Step 1: Send three maintainer announcements to relay_a | 303 | let identifier = &format!( |
| 257 | let client_a = TestClient::new(relay_a.url(), maintainer1_keys.clone()) | 304 | "multi-maintainer-repo-{}", |
| 258 | .await | 305 | owner_keys.public_key().to_hex()[..8].to_string() |
| 259 | .expect("Failed to connect to relay_a"); | 306 | ); |
| 260 | 307 | ||
| 308 | // Step 1: Send each maintainer announcement to relay_a then push git data so all three | ||
| 309 | // land in relay_a's DB. Each announcement lists relay_a only, so relay_b will reject | ||
| 310 | // them when syncing (no owner announcement in relay_b's DB yet). | ||
| 311 | let mut git_dirs = Vec::new(); | ||
| 261 | for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] | 312 | for (idx, maintainer_keys) in [&maintainer1_keys, &maintainer2_keys, &maintainer3_keys] |
| 262 | .iter() | 313 | .iter() |
| 263 | .enumerate() | 314 | .enumerate() |
| 264 | { | 315 | { |
| 316 | let m_npub = maintainer_keys | ||
| 317 | .public_key() | ||
| 318 | .to_bech32() | ||
| 319 | .expect("Failed to get npub"); | ||
| 265 | let announcement = EventBuilder::new( | 320 | let announcement = EventBuilder::new( |
| 266 | Kind::GitRepoAnnouncement, | 321 | Kind::GitRepoAnnouncement, |
| 267 | format!("Maintainer {} repository", idx + 1), | 322 | format!("Maintainer {} repository", idx + 1), |
| 268 | ) | 323 | ) |
| 269 | .tags(vec![ | 324 | .tags(vec![ |
| 270 | Tag::identifier(identifier), | 325 | Tag::identifier(identifier.as_str()), |
| 271 | Tag::custom( | 326 | Tag::custom( |
| 272 | TagKind::custom("clone"), | 327 | TagKind::custom("clone"), |
| 273 | vec![format!("https://{}/{}.git", relay_a.domain(), identifier)], | 328 | vec![format!( |
| 329 | "http://{}/{}/{}.git", | ||
| 330 | relay_a.domain(), | ||
| 331 | m_npub, | ||
| 332 | identifier | ||
| 333 | )], | ||
| 274 | ), | 334 | ), |
| 275 | Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), | 335 | Tag::custom(TagKind::custom("relays"), vec![relay_a.url().to_string()]), |
| 276 | ]) | 336 | ]) |
| 277 | .sign_with_keys(maintainer_keys) | 337 | .sign_with_keys(maintainer_keys) |
| 278 | .unwrap(); | 338 | .unwrap(); |
| 339 | send_to_relay(&relay_a, &announcement).await.unwrap(); | ||
| 340 | // Use push_unique_git_data_to_relay so each maintainer gets a distinct commit | ||
| 341 | // hash. Identical hashes cause git to skip pack transfer when the object | ||
| 342 | // already exists on the server, leaving the announcement in purgatory. | ||
| 343 | let git_dir = push_unique_git_data_to_relay( | ||
| 344 | &relay_a, | ||
| 345 | maintainer_keys, | ||
| 346 | identifier, | ||
| 347 | &[&relay_a.domain()], | ||
| 348 | &m_npub, | ||
| 349 | ) | ||
| 350 | .await; | ||
| 351 | git_dirs.push(git_dir); | ||
| 352 | } | ||
| 353 | println!("✓ Three maintainer announcements + git data pushed to relay_a"); | ||
| 279 | 354 | ||
| 280 | client_a.send_event(&announcement).await.unwrap(); | 355 | // Confirm all three announcements are queryable on relay_a before starting relay_b. |
| 356 | // This eliminates the race between relay_a's DB writes and relay_b's initial negentropy sync. | ||
| 357 | for (name, keys) in [ | ||
| 358 | ("maintainer1", &maintainer1_keys), | ||
| 359 | ("maintainer2", &maintainer2_keys), | ||
| 360 | ("maintainer3", &maintainer3_keys), | ||
| 361 | ] { | ||
| 362 | let filter = Filter::new() | ||
| 363 | .kind(Kind::GitRepoAnnouncement) | ||
| 364 | .author(keys.public_key()) | ||
| 365 | .identifier(identifier); | ||
| 366 | let found = | ||
| 367 | wait_for_event_on_relay(relay_a.url(), filter, Duration::from_secs(10)).await; | ||
| 368 | assert!(found, "{} announcement should be in relay_a before starting relay_b", name); | ||
| 281 | } | 369 | } |
| 282 | println!("✓ Three maintainer announcements sent to relay_a"); | 370 | println!("✓ All three maintainer announcements confirmed in relay_a's DB"); |
| 283 | 371 | ||
| 284 | // Step 2: Send owner announcement to relay_b (lists relay_a + all three maintainers) | 372 | // Step 2: Start relay_b with relay_a as bootstrap so its SyncManager connects immediately. |
| 285 | let client_b = TestClient::new(relay_b.url(), owner_keys.clone()) | 373 | // Because all three maintainer announcements are confirmed in relay_a's DB, relay_b's |
| 286 | .await | 374 | // initial negentropy sync will pick them all up and reject them (no owner announcement |
| 287 | .expect("Failed to connect to relay_b"); | 375 | // in relay_b's DB yet), storing them in the hot cache. |
| 376 | let relay_b = TestRelay::start_with_sync(Some(relay_a.url().to_string())).await; | ||
| 377 | println!("relay_b started at {}", relay_b.url()); | ||
| 378 | |||
| 379 | // Give relay_b's SyncManager time to complete the initial negentropy sync with relay_a. | ||
| 380 | // The negentropy sync completes within ~200ms (NGIT_TEST=1 sets batch window to 200ms), but we | ||
| 381 | // allow extra time for slow CI environments. | ||
| 382 | tokio::time::sleep(Duration::from_secs(3)).await; | ||
| 383 | println!("✓ relay_b synced from relay_a (maintainer announcements should be in hot cache)"); | ||
| 384 | |||
| 385 | // Step 3: Send owner announcement to relay_b → goes to purgatory. | ||
| 386 | let owner_npub = owner_keys | ||
| 387 | .public_key() | ||
| 388 | .to_bech32() | ||
| 389 | .expect("Failed to get npub"); | ||
| 288 | 390 | ||
| 289 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") | 391 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 290 | .tags(vec![ | 392 | .tags(vec![ |
| 291 | Tag::identifier(identifier), | 393 | Tag::identifier(identifier), |
| 292 | Tag::custom( | 394 | Tag::custom( |
| 293 | TagKind::custom("clone"), | 395 | TagKind::custom("clone"), |
| 294 | vec![format!("https://{}/{}.git", relay_b.domain(), identifier)], | 396 | vec![format!( |
| 397 | "http://{}/{}/{}.git", | ||
| 398 | relay_b.domain(), | ||
| 399 | owner_npub, | ||
| 400 | identifier | ||
| 401 | )], | ||
| 295 | ), | 402 | ), |
| 296 | Tag::custom( | 403 | Tag::custom( |
| 297 | TagKind::custom("relays"), | 404 | TagKind::custom("relays"), |
| @@ -309,13 +416,20 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 309 | .sign_with_keys(&owner_keys) | 416 | .sign_with_keys(&owner_keys) |
| 310 | .unwrap(); | 417 | .unwrap(); |
| 311 | 418 | ||
| 312 | client_b.send_event(&owner_announcement).await.unwrap(); | 419 | send_to_relay(&relay_b, &owner_announcement).await.unwrap(); |
| 313 | println!("✓ Owner announcement sent to relay_b"); | 420 | println!("✓ Owner announcement sent to relay_b (now in purgatory)"); |
| 314 | 421 | ||
| 315 | // Step 3: Wait for sync and re-processing | 422 | // Step 4: Push owner git data to relay_b. |
| 316 | tokio::time::sleep(Duration::from_secs(3)).await; | 423 | // This promotes the owner announcement from purgatory and triggers hot-cache |
| 424 | // re-processing for all three maintainer announcements. | ||
| 425 | let _git_dir_owner = | ||
| 426 | push_git_data_to_relay(&relay_b, &owner_keys, identifier, &[&relay_b.domain()]).await; | ||
| 427 | println!("✓ Owner git data pushed to relay_b (hot-cache re-processing should fire)"); | ||
| 428 | |||
| 429 | // Step 5: Wait briefly for async processing to complete. | ||
| 430 | tokio::time::sleep(Duration::from_secs(1)).await; | ||
| 317 | 431 | ||
| 318 | // Step 4: Verify all four announcements are in relay_b's database | 432 | // Step 6: Verify all four announcements are in relay_b's database. |
| 319 | for (name, keys) in [ | 433 | for (name, keys) in [ |
| 320 | ("owner", &owner_keys), | 434 | ("owner", &owner_keys), |
| 321 | ("maintainer1", &maintainer1_keys), | 435 | ("maintainer1", &maintainer1_keys), |
| @@ -333,8 +447,6 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 333 | 447 | ||
| 334 | println!("✅ All three maintainer announcements re-processed successfully"); | 448 | println!("✅ All three maintainer announcements re-processed successfully"); |
| 335 | 449 | ||
| 336 | client_a.disconnect().await; | ||
| 337 | client_b.disconnect().await; | ||
| 338 | relay_a.stop().await; | 450 | relay_a.stop().await; |
| 339 | relay_b.stop().await; | 451 | relay_b.stop().await; |
| 340 | } | 452 | } |
| @@ -342,10 +454,10 @@ async fn test_multiple_maintainers_all_reprocessed() { | |||
| 342 | /// Test that invalid maintainer public keys don't cause panics | 454 | /// Test that invalid maintainer public keys don't cause panics |
| 343 | /// | 455 | /// |
| 344 | /// Flow: | 456 | /// Flow: |
| 345 | /// 1. Maintainer announcement arrives → Rejected | 457 | /// 1. Maintainer announcement arrives → Rejected (doesn't list our relay) |
| 346 | /// 2. Owner announcement arrives with INVALID maintainer hex → Should handle gracefully | 458 | /// 2. Owner announcement + git push → accepted, with INVALID maintainer hex in maintainers tag |
| 347 | /// 3. Owner announcement should still be accepted | 459 | /// 3. Owner announcement should be accepted |
| 348 | /// 4. Maintainer announcement should NOT be re-processed (invalid pubkey) | 460 | /// 4. Maintainer announcement should NOT be re-processed (invalid pubkey can't be parsed) |
| 349 | #[tokio::test] | 461 | #[tokio::test] |
| 350 | async fn test_invalid_maintainer_pubkey_handled_gracefully() { | 462 | async fn test_invalid_maintainer_pubkey_handled_gracefully() { |
| 351 | let relay = TestRelay::start().await; | 463 | let relay = TestRelay::start().await; |
| @@ -382,13 +494,25 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { | |||
| 382 | let _ = client.send_event(&maintainer_announcement).await; | 494 | let _ = client.send_event(&maintainer_announcement).await; |
| 383 | tokio::time::sleep(Duration::from_millis(200)).await; | 495 | tokio::time::sleep(Duration::from_millis(200)).await; |
| 384 | 496 | ||
| 385 | // Step 2: Send owner announcement with INVALID maintainer hex | 497 | // Step 2: Send owner announcement with INVALID maintainer hex, then push git data. |
| 498 | // The announcement goes to purgatory first; the git push promotes it. | ||
| 499 | // The invalid maintainer hex should be handled gracefully (no panic). | ||
| 500 | let owner_npub = owner_keys | ||
| 501 | .public_key() | ||
| 502 | .to_bech32() | ||
| 503 | .expect("Failed to get npub"); | ||
| 504 | |||
| 386 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") | 505 | let owner_announcement = EventBuilder::new(Kind::GitRepoAnnouncement, "Owner's repository") |
| 387 | .tags(vec![ | 506 | .tags(vec![ |
| 388 | Tag::identifier(identifier), | 507 | Tag::identifier(identifier), |
| 389 | Tag::custom( | 508 | Tag::custom( |
| 390 | TagKind::custom("clone"), | 509 | TagKind::custom("clone"), |
| 391 | vec![format!("https://{}/{}.git", relay.domain(), identifier)], | 510 | vec![format!( |
| 511 | "http://{}/{}/{}.git", | ||
| 512 | relay.domain(), | ||
| 513 | owner_npub, | ||
| 514 | identifier | ||
| 515 | )], | ||
| 392 | ), | 516 | ), |
| 393 | Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]), | 517 | Tag::custom(TagKind::custom("relays"), vec![relay.url().to_string()]), |
| 394 | Tag::custom( | 518 | Tag::custom( |
| @@ -399,7 +523,9 @@ async fn test_invalid_maintainer_pubkey_handled_gracefully() { | |||
| 399 | .sign_with_keys(&owner_keys) | 523 | .sign_with_keys(&owner_keys) |
| 400 | .unwrap(); | 524 | .unwrap(); |
| 401 | 525 | ||
| 402 | client.send_event(&owner_announcement).await.unwrap(); | 526 | send_to_relay(&relay, &owner_announcement).await.unwrap(); |
| 527 | let _git_dir = | ||
| 528 | push_git_data_to_relay(&relay, &owner_keys, identifier, &[&relay.domain()]).await; | ||
| 403 | tokio::time::sleep(Duration::from_millis(500)).await; | 529 | tokio::time::sleep(Duration::from_millis(500)).await; |
| 404 | 530 | ||
| 405 | // Step 3: Verify owner announcement accepted, maintainer not re-processed | 531 | // Step 3: Verify owner announcement accepted, maintainer not re-processed |
diff --git a/tests/sync/metrics.rs b/tests/sync/metrics.rs index e8c75c7..e973bbb 100644 --- a/tests/sync/metrics.rs +++ b/tests/sync/metrics.rs | |||
| @@ -16,8 +16,8 @@ use nostr_sdk::prelude::*; | |||
| 16 | 16 | ||
| 17 | use crate::common::{ | 17 | use crate::common::{ |
| 18 | sync_helpers::{ | 18 | sync_helpers::{ |
| 19 | create_repo_announcement, fetch_metrics, wait_for_sync_connection, MetricsTestHarness, | 19 | create_repo_announcement, fetch_metrics, setup_announcement_on_relay, |
| 20 | ParsedMetrics, TestClient, | 20 | wait_for_sync_connection, MetricsTestHarness, ParsedMetrics, TestClient, |
| 21 | }, | 21 | }, |
| 22 | TestRelay, | 22 | TestRelay, |
| 23 | }; | 23 | }; |
| @@ -224,16 +224,17 @@ async fn test_startup_sync_event_count() { | |||
| 224 | // 3. Create test keys | 224 | // 3. Create test keys |
| 225 | let keys = Keys::generate(); | 225 | let keys = Keys::generate(); |
| 226 | 226 | ||
| 227 | // 4. Create an announcement that lists BOTH relays (required for discovery) | 227 | // 4. Set up announcement on SOURCE relay with git data |
| 228 | let announcement = create_repo_announcement( | 228 | // (purgatory requires git data before announcements are accepted) |
| 229 | &keys, | 229 | let repo_id = "test-repo-metrics"; |
| 230 | &[&source_relay.domain(), &syncing_relay.domain()], | 230 | let domains = vec![source_relay.domain(), syncing_relay.domain()]; |
| 231 | "test-repo-metrics", | 231 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 232 | ); | 232 | |
| 233 | let (announcement, _git_dir_source) = | ||
| 234 | setup_announcement_on_relay(&source_relay, &keys, &domain_refs, repo_id).await; | ||
| 233 | println!( | 235 | println!( |
| 234 | "Created announcement {} (kind {})", | 236 | "Announcement {} set up on source relay with git data", |
| 235 | announcement.id, | 237 | announcement.id |
| 236 | announcement.kind.as_u16() | ||
| 237 | ); | 238 | ); |
| 238 | 239 | ||
| 239 | // 5. Build the repo coordinate for the 'a' tag in the patches | 240 | // 5. Build the repo coordinate for the 'a' tag in the patches |
| @@ -241,7 +242,7 @@ async fn test_startup_sync_event_count() { | |||
| 241 | "{}:{}:{}", | 242 | "{}:{}:{}", |
| 242 | Kind::GitRepoAnnouncement.as_u16(), | 243 | Kind::GitRepoAnnouncement.as_u16(), |
| 243 | keys.public_key().to_hex(), | 244 | keys.public_key().to_hex(), |
| 244 | "test-repo-metrics" | 245 | repo_id |
| 245 | ); | 246 | ); |
| 246 | 247 | ||
| 247 | // 6. Create 3 patch events (Layer 2) that reference the announcement | 248 | // 6. Create 3 patch events (Layer 2) that reference the announcement |
| @@ -257,17 +258,11 @@ async fn test_startup_sync_event_count() { | |||
| 257 | .collect(); | 258 | .collect(); |
| 258 | println!("Created {} patches", patches.len()); | 259 | println!("Created {} patches", patches.len()); |
| 259 | 260 | ||
| 260 | // 7. Send announcement + patches to SOURCE relay ONLY | 261 | // 7. Send patches to SOURCE relay |
| 261 | let source_client = TestClient::new(source_relay.url(), keys.clone()) | 262 | let source_client = TestClient::new(source_relay.url(), keys.clone()) |
| 262 | .await | 263 | .await |
| 263 | .expect("Failed to connect to source relay"); | 264 | .expect("Failed to connect to source relay"); |
| 264 | 265 | ||
| 265 | source_client | ||
| 266 | .send_event(&announcement) | ||
| 267 | .await | ||
| 268 | .expect("Failed to send announcement to source"); | ||
| 269 | println!("Announcement sent to source relay"); | ||
| 270 | |||
| 271 | for patch in &patches { | 266 | for patch in &patches { |
| 272 | source_client | 267 | source_client |
| 273 | .send_event(patch) | 268 | .send_event(patch) |
| @@ -277,17 +272,10 @@ async fn test_startup_sync_event_count() { | |||
| 277 | println!("Patches sent to source relay"); | 272 | println!("Patches sent to source relay"); |
| 278 | source_client.disconnect().await; | 273 | source_client.disconnect().await; |
| 279 | 274 | ||
| 280 | // 8. Send announcement to SYNCING relay (triggers discovery of source relay) | 275 | // 8. Set up announcement on SYNCING relay (triggers discovery of source relay) |
| 281 | let syncing_client = TestClient::new(syncing_relay.url(), keys.clone()) | 276 | let (_announcement_syncing, _git_dir_syncing) = |
| 282 | .await | 277 | setup_announcement_on_relay(&syncing_relay, &keys, &domain_refs, repo_id).await; |
| 283 | .expect("Failed to connect to syncing relay"); | 278 | println!("Announcement set up on syncing relay (triggers discovery of source)"); |
| 284 | |||
| 285 | syncing_client | ||
| 286 | .send_event(&announcement) | ||
| 287 | .await | ||
| 288 | .expect("Failed to send announcement to syncing relay"); | ||
| 289 | println!("Announcement sent to syncing relay (triggers discovery of source)"); | ||
| 290 | syncing_client.disconnect().await; | ||
| 291 | 279 | ||
| 292 | // 9. Wait for discovery + sync to complete | 280 | // 9. Wait for discovery + sync to complete |
| 293 | println!("Waiting 5s for discovery and sync..."); | 281 | println!("Waiting 5s for discovery and sync..."); |
| @@ -404,18 +392,35 @@ async fn test_connection_failure_increments_counter() { | |||
| 404 | /// Test that live sync events are counted in metrics. | 392 | /// Test that live sync events are counted in metrics. |
| 405 | /// | 393 | /// |
| 406 | /// This test validates that events received via live subscription | 394 | /// This test validates that events received via live subscription |
| 407 | /// (after sync connection is established) are counted separately | 395 | /// (after sync connection is established) are counted in metrics. |
| 408 | /// from startup/bootstrap events. | 396 | /// Uses Layer 2 patch events (not announcements) to avoid purgatory, |
| 397 | /// since Layer 2 events are accepted directly to the DB. | ||
| 409 | #[tokio::test] | 398 | #[tokio::test] |
| 410 | async fn test_live_sync_event_count() { | 399 | async fn test_live_sync_event_count() { |
| 411 | let mut harness = MetricsTestHarness::with_sources(1).await; | ||
| 412 | |||
| 413 | // Pre-allocate syncing relay port to include in announcements | 400 | // Pre-allocate syncing relay port to include in announcements |
| 414 | let sync_port = TestRelay::find_free_port(); | 401 | let sync_port = TestRelay::find_free_port(); |
| 415 | let sync_domain = format!("127.0.0.1:{}", sync_port); | 402 | let sync_domain = format!("127.0.0.1:{}", sync_port); |
| 416 | 403 | ||
| 404 | // Start source relay | ||
| 405 | let source_relay = TestRelay::start().await; | ||
| 406 | println!("Source relay started at {}", source_relay.url()); | ||
| 407 | |||
| 408 | // Set up announcement on source relay BEFORE starting syncing relay | ||
| 409 | // This allows discovery when syncing relay connects | ||
| 410 | let keys = Keys::generate(); | ||
| 411 | let repo_id = "live-metrics-repo"; | ||
| 412 | let domains = vec![source_relay.domain(), sync_domain.clone()]; | ||
| 413 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); | ||
| 414 | |||
| 415 | let (_announcement, _git_dir) = | ||
| 416 | setup_announcement_on_relay(&source_relay, &keys, &domain_refs, repo_id).await; | ||
| 417 | println!("Announcement set up on source relay with git data"); | ||
| 418 | |||
| 417 | // Start syncing relay with pre-allocated port | 419 | // Start syncing relay with pre-allocated port |
| 418 | harness.start_syncing_relay_on_port(0, sync_port).await; | 420 | let syncing_relay = |
| 421 | TestRelay::start_on_port_with_options(sync_port, Some(source_relay.url().to_string()), false) | ||
| 422 | .await; | ||
| 423 | println!("Syncing relay started at {}", syncing_relay.url()); | ||
| 419 | 424 | ||
| 420 | // Wait for sync connection to be fully established with EOSE received | 425 | // Wait for sync connection to be fully established with EOSE received |
| 421 | // This ensures we're in "live" mode before submitting test events | 426 | // This ensures we're in "live" mode before submitting test events |
| @@ -424,33 +429,61 @@ async fn test_live_sync_event_count() { | |||
| 424 | .await | 429 | .await |
| 425 | .expect("Sync connection should be established"); | 430 | .expect("Sync connection should be established"); |
| 426 | 431 | ||
| 427 | // Additional small delay to ensure EOSE has been processed | 432 | // Additional delay to ensure purgatory promotion completes on syncing relay |
| 428 | tokio::time::sleep(Duration::from_millis(500)).await; | 433 | tokio::time::sleep(Duration::from_secs(4)).await; |
| 429 | 434 | ||
| 430 | // Now add events - these should be "live" not "startup" | 435 | // Now add Layer 2 patch events (not announcements) - these are accepted immediately |
| 431 | // Include BOTH domains so events are accepted by both relays | 436 | // (Layer 2 events are accepted directly to DB, no purgatory) |
| 432 | let keys = Keys::generate(); | 437 | let repo_coord_str = format!( |
| 433 | let events: Vec<_> = (0..2) | 438 | "{}:{}:{}", |
| 434 | .map(|i| { | 439 | Kind::GitRepoAnnouncement.as_u16(), |
| 435 | create_repo_announcement( | 440 | keys.public_key().to_hex(), |
| 436 | &keys, | 441 | repo_id |
| 437 | &[&harness.source_domain(0), &sync_domain], | 442 | ); |
| 438 | &format!("live-{}", i), | 443 | |
| 439 | ) | 444 | let patch1 = create_event_referencing_repo( |
| 440 | }) | 445 | &keys, |
| 441 | .collect(); | 446 | &repo_coord_str, |
| 442 | harness.submit_events(0, &events).await.unwrap(); | 447 | Kind::GitPatch.as_u16(), |
| 448 | "Live test patch 1", | ||
| 449 | ); | ||
| 450 | let patch2 = create_event_referencing_repo( | ||
| 451 | &keys, | ||
| 452 | &repo_coord_str, | ||
| 453 | Kind::GitPatch.as_u16(), | ||
| 454 | "Live test patch 2", | ||
| 455 | ); | ||
| 456 | |||
| 457 | // Send patches to source AFTER sync connection established (live mode) | ||
| 458 | let client = TestClient::new(source_relay.url(), keys.clone()) | ||
| 459 | .await | ||
| 460 | .expect("Failed to connect to source"); | ||
| 461 | client.send_event(&patch1).await.expect("Failed to send patch 1"); | ||
| 462 | client.send_event(&patch2).await.expect("Failed to send patch 2"); | ||
| 463 | client.disconnect().await; | ||
| 464 | println!("Two patches sent to source relay (live mode)"); | ||
| 443 | 465 | ||
| 444 | // Wait for live events to be processed and metrics updated | 466 | // Wait for live events to be processed and metrics updated |
| 445 | tokio::time::sleep(Duration::from_secs(4)).await; | 467 | tokio::time::sleep(Duration::from_secs(4)).await; |
| 446 | let metrics = harness.get_metrics().await.unwrap(); | 468 | |
| 469 | // Fetch metrics from syncing relay | ||
| 470 | let raw_metrics = fetch_metrics(&sync_url) | ||
| 471 | .await | ||
| 472 | .expect("Failed to fetch metrics"); | ||
| 473 | let metrics = ParsedMetrics::parse(&raw_metrics); | ||
| 447 | 474 | ||
| 448 | let synced_count = metrics.events_synced_total(); | 475 | let synced_count = metrics.events_synced_total(); |
| 449 | println!("Events synced total: {:?}", synced_count); | 476 | println!("Events synced total: {:?}", synced_count); |
| 450 | 477 | ||
| 451 | assert_eq!(synced_count, Some(2), "Should have 2 synced events"); | 478 | // Cleanup |
| 479 | syncing_relay.stop().await; | ||
| 480 | source_relay.stop().await; | ||
| 452 | 481 | ||
| 453 | harness.stop_all().await; | 482 | assert!( |
| 483 | synced_count.is_some() && synced_count.unwrap() >= 2, | ||
| 484 | "Should have synced at least 2 events, got {:?}", | ||
| 485 | synced_count | ||
| 486 | ); | ||
| 454 | } | 487 | } |
| 455 | 488 | ||
| 456 | /// Test that relay connected status is tracked in metrics. | 489 | /// Test that relay connected status is tracked in metrics. |
diff --git a/tests/sync/mod.rs b/tests/sync/mod.rs index 400341f..70c6981 100644 --- a/tests/sync/mod.rs +++ b/tests/sync/mod.rs | |||
| @@ -82,14 +82,12 @@ | |||
| 82 | //! **Example from `discovery.rs`:** | 82 | //! **Example from `discovery.rs`:** |
| 83 | //! ```rust | 83 | //! ```rust |
| 84 | //! #[tokio::test] | 84 | //! #[tokio::test] |
| 85 | //! async fn test_recursive_relay_discovery() { | 85 | //! async fn test_discovers_layer3_via_layer2() { |
| 86 | //! // Multi-relay orchestration | 86 | //! // Multi-relay orchestration |
| 87 | //! let relay1 = TestRelay::start().await; | 87 | //! let relay_a = TestRelay::start().await; |
| 88 | //! let relay2 = TestRelay::start().await; | 88 | //! let relay_b = TestRelay::start_with_sync(None).await; |
| 89 | //! let relay3 = TestRelay::start().await; | ||
| 90 | //! | 89 | //! |
| 91 | //! // relay1 announces relay2, relay2 announces relay3 | 90 | //! // relay_b receives announcement listing relay_a, discovers and syncs from it |
| 92 | //! // Verify relay1 discovers relay3 through chain | ||
| 93 | //! } | 91 | //! } |
| 94 | //! ``` | 92 | //! ``` |
| 95 | //! | 93 | //! |
diff --git a/tests/sync/tag_variations.rs b/tests/sync/tag_variations.rs index 46b1203..021ad0e 100644 --- a/tests/sync/tag_variations.rs +++ b/tests/sync/tag_variations.rs | |||
| @@ -55,30 +55,19 @@ async fn test_layer2_sync_with_lowercase_a_tag() { | |||
| 55 | 55 | ||
| 56 | let keys = Keys::generate(); | 56 | let keys = Keys::generate(); |
| 57 | 57 | ||
| 58 | // 2. Create and send repository announcement to both relays | 58 | // 2. Create and send repository announcement to both relays with git data |
| 59 | // (purgatory requires git data before announcements are accepted) | ||
| 59 | let repo_id = "test-repo-tag-8a"; | 60 | let repo_id = "test-repo-tag-8a"; |
| 60 | let announcement = | 61 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 61 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 62 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 62 | 63 | ||
| 63 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 64 | let (_announcement, _git_dir_a) = |
| 64 | .await | 65 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 65 | .expect("Failed to connect to relay_a"); | 66 | println!("Announcement set up on relay_a with git data"); |
| 66 | |||
| 67 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | ||
| 68 | .await | ||
| 69 | .expect("Failed to connect to relay_b"); | ||
| 70 | 67 | ||
| 71 | client_a | 68 | let (_announcement_b, _git_dir_b) = |
| 72 | .send_event(&announcement) | 69 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 73 | .await | 70 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 74 | .expect("Failed to send announcement to relay_a"); | ||
| 75 | println!("Announcement sent to relay_a"); | ||
| 76 | |||
| 77 | client_b | ||
| 78 | .send_event(&announcement) | ||
| 79 | .await | ||
| 80 | .expect("Failed to send announcement to relay_b"); | ||
| 81 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 82 | 71 | ||
| 83 | // 3. Wait for discovery | 72 | // 3. Wait for discovery |
| 84 | tokio::time::sleep(Duration::from_secs(1)).await; | 73 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -95,9 +84,10 @@ async fn test_layer2_sync_with_lowercase_a_tag() { | |||
| 95 | issue_id, | 84 | issue_id, |
| 96 | issue.kind.as_u16() | 85 | issue.kind.as_u16() |
| 97 | ); | 86 | ); |
| 98 | for tag in issue.tags.iter() { | 87 | |
| 99 | println!(" Tag: {:?}", tag.as_slice()); | 88 | let client_a = TestClient::new(relay_a.url(), keys.clone()) |
| 100 | } | 89 | .await |
| 90 | .expect("Failed to connect to relay_a"); | ||
| 101 | 91 | ||
| 102 | client_a | 92 | client_a |
| 103 | .send_event(&issue) | 93 | .send_event(&issue) |
| @@ -106,7 +96,6 @@ async fn test_layer2_sync_with_lowercase_a_tag() { | |||
| 106 | println!("Issue sent to relay_a"); | 96 | println!("Issue sent to relay_a"); |
| 107 | 97 | ||
| 108 | client_a.disconnect().await; | 98 | client_a.disconnect().await; |
| 109 | client_b.disconnect().await; | ||
| 110 | 99 | ||
| 111 | // 5. Wait and verify event syncs to relay_b | 100 | // 5. Wait and verify event syncs to relay_b |
| 112 | let filter = Filter::new() | 101 | let filter = Filter::new() |
| @@ -154,30 +143,18 @@ async fn test_layer2_sync_with_uppercase_a_tag() { | |||
| 154 | 143 | ||
| 155 | let keys = Keys::generate(); | 144 | let keys = Keys::generate(); |
| 156 | 145 | ||
| 157 | // 2. Create and send repository announcement to both relays | 146 | // 2. Create and send repository announcement to both relays with git data |
| 158 | let repo_id = "test-repo-tag-8b"; | 147 | let repo_id = "test-repo-tag-8b"; |
| 159 | let announcement = | 148 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 160 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 149 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 161 | |||
| 162 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 163 | .await | ||
| 164 | .expect("Failed to connect to relay_a"); | ||
| 165 | 150 | ||
| 166 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 151 | let (_announcement, _git_dir_a) = |
| 167 | .await | 152 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 168 | .expect("Failed to connect to relay_b"); | 153 | println!("Announcement set up on relay_a with git data"); |
| 169 | 154 | ||
| 170 | client_a | 155 | let (_announcement_b, _git_dir_b) = |
| 171 | .send_event(&announcement) | 156 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 172 | .await | 157 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 173 | .expect("Failed to send announcement to relay_a"); | ||
| 174 | println!("Announcement sent to relay_a"); | ||
| 175 | |||
| 176 | client_b | ||
| 177 | .send_event(&announcement) | ||
| 178 | .await | ||
| 179 | .expect("Failed to send announcement to relay_b"); | ||
| 180 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 181 | 158 | ||
| 182 | // 3. Wait for discovery | 159 | // 3. Wait for discovery |
| 183 | tokio::time::sleep(Duration::from_secs(1)).await; | 160 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -197,9 +174,10 @@ async fn test_layer2_sync_with_uppercase_a_tag() { | |||
| 197 | issue_id, | 174 | issue_id, |
| 198 | issue.kind.as_u16() | 175 | issue.kind.as_u16() |
| 199 | ); | 176 | ); |
| 200 | for tag in issue.tags.iter() { | 177 | |
| 201 | println!(" Tag: {:?}", tag.as_slice()); | 178 | let client_a = TestClient::new(relay_a.url(), keys.clone()) |
| 202 | } | 179 | .await |
| 180 | .expect("Failed to connect to relay_a"); | ||
| 203 | 181 | ||
| 204 | client_a | 182 | client_a |
| 205 | .send_event(&issue) | 183 | .send_event(&issue) |
| @@ -208,7 +186,6 @@ async fn test_layer2_sync_with_uppercase_a_tag() { | |||
| 208 | println!("Issue sent to relay_a"); | 186 | println!("Issue sent to relay_a"); |
| 209 | 187 | ||
| 210 | client_a.disconnect().await; | 188 | client_a.disconnect().await; |
| 211 | client_b.disconnect().await; | ||
| 212 | 189 | ||
| 213 | // 5. Wait and verify event syncs to relay_b | 190 | // 5. Wait and verify event syncs to relay_b |
| 214 | let filter = Filter::new() | 191 | let filter = Filter::new() |
| @@ -255,30 +232,18 @@ async fn test_layer2_sync_with_q_tag() { | |||
| 255 | 232 | ||
| 256 | let keys = Keys::generate(); | 233 | let keys = Keys::generate(); |
| 257 | 234 | ||
| 258 | // 2. Create and send repository announcement to both relays | 235 | // 2. Create and send repository announcement to both relays with git data |
| 259 | let repo_id = "test-repo-tag-8c"; | 236 | let repo_id = "test-repo-tag-8c"; |
| 260 | let announcement = | 237 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 261 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 238 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 262 | 239 | ||
| 263 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 240 | let (_announcement, _git_dir_a) = |
| 264 | .await | 241 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 265 | .expect("Failed to connect to relay_a"); | 242 | println!("Announcement set up on relay_a with git data"); |
| 266 | 243 | ||
| 267 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 244 | let (_announcement_b, _git_dir_b) = |
| 268 | .await | 245 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 269 | .expect("Failed to connect to relay_b"); | 246 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 270 | |||
| 271 | client_a | ||
| 272 | .send_event(&announcement) | ||
| 273 | .await | ||
| 274 | .expect("Failed to send announcement to relay_a"); | ||
| 275 | println!("Announcement sent to relay_a"); | ||
| 276 | |||
| 277 | client_b | ||
| 278 | .send_event(&announcement) | ||
| 279 | .await | ||
| 280 | .expect("Failed to send announcement to relay_b"); | ||
| 281 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 282 | 247 | ||
| 283 | // 3. Wait for discovery | 248 | // 3. Wait for discovery |
| 284 | tokio::time::sleep(Duration::from_secs(1)).await; | 249 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -294,9 +259,10 @@ async fn test_layer2_sync_with_q_tag() { | |||
| 294 | issue_id, | 259 | issue_id, |
| 295 | issue.kind.as_u16() | 260 | issue.kind.as_u16() |
| 296 | ); | 261 | ); |
| 297 | for tag in issue.tags.iter() { | 262 | |
| 298 | println!(" Tag: {:?}", tag.as_slice()); | 263 | let client_a = TestClient::new(relay_a.url(), keys.clone()) |
| 299 | } | 264 | .await |
| 265 | .expect("Failed to connect to relay_a"); | ||
| 300 | 266 | ||
| 301 | client_a | 267 | client_a |
| 302 | .send_event(&issue) | 268 | .send_event(&issue) |
| @@ -305,7 +271,6 @@ async fn test_layer2_sync_with_q_tag() { | |||
| 305 | println!("Issue sent to relay_a"); | 271 | println!("Issue sent to relay_a"); |
| 306 | 272 | ||
| 307 | client_a.disconnect().await; | 273 | client_a.disconnect().await; |
| 308 | client_b.disconnect().await; | ||
| 309 | 274 | ||
| 310 | // 5. Wait and verify event syncs to relay_b | 275 | // 5. Wait and verify event syncs to relay_b |
| 311 | let filter = Filter::new() | 276 | let filter = Filter::new() |
| @@ -362,30 +327,18 @@ async fn test_layer3_sync_with_lowercase_e_tag() { | |||
| 362 | 327 | ||
| 363 | let keys = Keys::generate(); | 328 | let keys = Keys::generate(); |
| 364 | 329 | ||
| 365 | // 2. Create and send repository announcement to both relays | 330 | // 2. Create and send repository announcement to both relays with git data |
| 366 | let repo_id = "test-repo-tag-9a"; | 331 | let repo_id = "test-repo-tag-9a"; |
| 367 | let announcement = | 332 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 368 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 333 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 369 | 334 | ||
| 370 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 335 | let (_announcement, _git_dir_a) = |
| 371 | .await | 336 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 372 | .expect("Failed to connect to relay_a"); | 337 | println!("Announcement set up on relay_a with git data"); |
| 373 | |||
| 374 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | ||
| 375 | .await | ||
| 376 | .expect("Failed to connect to relay_b"); | ||
| 377 | 338 | ||
| 378 | client_a | 339 | let (_announcement_b, _git_dir_b) = |
| 379 | .send_event(&announcement) | 340 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 380 | .await | 341 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 381 | .expect("Failed to send announcement to relay_a"); | ||
| 382 | println!("Announcement sent to relay_a"); | ||
| 383 | |||
| 384 | client_b | ||
| 385 | .send_event(&announcement) | ||
| 386 | .await | ||
| 387 | .expect("Failed to send announcement to relay_b"); | ||
| 388 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 389 | 342 | ||
| 390 | // 3. Wait for discovery | 343 | // 3. Wait for discovery |
| 391 | tokio::time::sleep(Duration::from_secs(1)).await; | 344 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -396,6 +349,10 @@ async fn test_layer3_sync_with_lowercase_e_tag() { | |||
| 396 | .expect("Failed to create issue"); | 349 | .expect("Failed to create issue"); |
| 397 | let issue_id = issue.id; | 350 | let issue_id = issue.id; |
| 398 | 351 | ||
| 352 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 353 | .await | ||
| 354 | .expect("Failed to connect to relay_a"); | ||
| 355 | |||
| 399 | client_a | 356 | client_a |
| 400 | .send_event(&issue) | 357 | .send_event(&issue) |
| 401 | .await | 358 | .await |
| @@ -410,11 +367,6 @@ async fn test_layer3_sync_with_lowercase_e_tag() { | |||
| 410 | assert!(issue_synced, "Layer 2 issue should sync first"); | 367 | assert!(issue_synced, "Layer 2 issue should sync first"); |
| 411 | 368 | ||
| 412 | // Wait for Layer 3 subscriptions to be established | 369 | // Wait for Layer 3 subscriptions to be established |
| 413 | // After issue syncs, relay_b's SelfSubscriber needs time to: | ||
| 414 | // 1. Receive the synced issue via notify_event broadcast | ||
| 415 | // 2. Batch timer to tick (up to 200ms in tests) | ||
| 416 | // 3. Process batch and create Layer 3 filters | ||
| 417 | // 4. Subscribe to relay_a with Layer 3 filters | ||
| 418 | tokio::time::sleep(Duration::from_millis(500)).await; | 370 | tokio::time::sleep(Duration::from_millis(500)).await; |
| 419 | 371 | ||
| 420 | // 6. Create and send Layer 3 reply with lowercase 'e' tag (kind 1) | 372 | // 6. Create and send Layer 3 reply with lowercase 'e' tag (kind 1) |
| @@ -427,9 +379,6 @@ async fn test_layer3_sync_with_lowercase_e_tag() { | |||
| 427 | reply_id, | 379 | reply_id, |
| 428 | reply.kind.as_u16() | 380 | reply.kind.as_u16() |
| 429 | ); | 381 | ); |
| 430 | for tag in reply.tags.iter() { | ||
| 431 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 432 | } | ||
| 433 | 382 | ||
| 434 | client_a | 383 | client_a |
| 435 | .send_event(&reply) | 384 | .send_event(&reply) |
| @@ -438,7 +387,6 @@ async fn test_layer3_sync_with_lowercase_e_tag() { | |||
| 438 | println!("Layer 3 reply {} sent to relay_a", reply_id); | 387 | println!("Layer 3 reply {} sent to relay_a", reply_id); |
| 439 | 388 | ||
| 440 | client_a.disconnect().await; | 389 | client_a.disconnect().await; |
| 441 | client_b.disconnect().await; | ||
| 442 | 390 | ||
| 443 | // 7. Wait and verify reply syncs to relay_b | 391 | // 7. Wait and verify reply syncs to relay_b |
| 444 | let reply_filter = Filter::new() | 392 | let reply_filter = Filter::new() |
| @@ -486,30 +434,18 @@ async fn test_layer3_sync_with_uppercase_e_tag() { | |||
| 486 | 434 | ||
| 487 | let keys = Keys::generate(); | 435 | let keys = Keys::generate(); |
| 488 | 436 | ||
| 489 | // 2. Create and send repository announcement to both relays | 437 | // 2. Create and send repository announcement to both relays with git data |
| 490 | let repo_id = "test-repo-tag-9b"; | 438 | let repo_id = "test-repo-tag-9b"; |
| 491 | let announcement = | 439 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 492 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 440 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 493 | |||
| 494 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 495 | .await | ||
| 496 | .expect("Failed to connect to relay_a"); | ||
| 497 | 441 | ||
| 498 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 442 | let (_announcement, _git_dir_a) = |
| 499 | .await | 443 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 500 | .expect("Failed to connect to relay_b"); | 444 | println!("Announcement set up on relay_a with git data"); |
| 501 | 445 | ||
| 502 | client_a | 446 | let (_announcement_b, _git_dir_b) = |
| 503 | .send_event(&announcement) | 447 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 504 | .await | 448 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 505 | .expect("Failed to send announcement to relay_a"); | ||
| 506 | println!("Announcement sent to relay_a"); | ||
| 507 | |||
| 508 | client_b | ||
| 509 | .send_event(&announcement) | ||
| 510 | .await | ||
| 511 | .expect("Failed to send announcement to relay_b"); | ||
| 512 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 513 | 449 | ||
| 514 | // 3. Wait for discovery | 450 | // 3. Wait for discovery |
| 515 | tokio::time::sleep(Duration::from_secs(1)).await; | 451 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -520,6 +456,10 @@ async fn test_layer3_sync_with_uppercase_e_tag() { | |||
| 520 | .expect("Failed to create issue"); | 456 | .expect("Failed to create issue"); |
| 521 | let issue_id = issue.id; | 457 | let issue_id = issue.id; |
| 522 | 458 | ||
| 459 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 460 | .await | ||
| 461 | .expect("Failed to connect to relay_a"); | ||
| 462 | |||
| 523 | client_a | 463 | client_a |
| 524 | .send_event(&issue) | 464 | .send_event(&issue) |
| 525 | .await | 465 | .await |
| @@ -534,11 +474,6 @@ async fn test_layer3_sync_with_uppercase_e_tag() { | |||
| 534 | assert!(issue_synced, "Layer 2 issue should sync first"); | 474 | assert!(issue_synced, "Layer 2 issue should sync first"); |
| 535 | 475 | ||
| 536 | // Wait for Layer 3 subscriptions to be established | 476 | // Wait for Layer 3 subscriptions to be established |
| 537 | // After issue syncs, relay_b's SelfSubscriber needs time to: | ||
| 538 | // 1. Receive the synced issue via notify_event broadcast | ||
| 539 | // 2. Batch timer to tick (up to 200ms in tests) | ||
| 540 | // 3. Process batch and create Layer 3 filters | ||
| 541 | // 4. Subscribe to relay_a with Layer 3 filters | ||
| 542 | tokio::time::sleep(Duration::from_millis(500)).await; | 477 | tokio::time::sleep(Duration::from_millis(500)).await; |
| 543 | 478 | ||
| 544 | // 6. Create and send Layer 3 comment with uppercase 'E' tag (kind 1111) | 479 | // 6. Create and send Layer 3 comment with uppercase 'E' tag (kind 1111) |
| @@ -552,9 +487,6 @@ async fn test_layer3_sync_with_uppercase_e_tag() { | |||
| 552 | comment_id, | 487 | comment_id, |
| 553 | comment.kind.as_u16() | 488 | comment.kind.as_u16() |
| 554 | ); | 489 | ); |
| 555 | for tag in comment.tags.iter() { | ||
| 556 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 557 | } | ||
| 558 | 490 | ||
| 559 | client_a | 491 | client_a |
| 560 | .send_event(&comment) | 492 | .send_event(&comment) |
| @@ -563,7 +495,6 @@ async fn test_layer3_sync_with_uppercase_e_tag() { | |||
| 563 | println!("Layer 3 comment {} sent to relay_a", comment_id); | 495 | println!("Layer 3 comment {} sent to relay_a", comment_id); |
| 564 | 496 | ||
| 565 | client_a.disconnect().await; | 497 | client_a.disconnect().await; |
| 566 | client_b.disconnect().await; | ||
| 567 | 498 | ||
| 568 | // 7. Wait and verify comment syncs to relay_b | 499 | // 7. Wait and verify comment syncs to relay_b |
| 569 | let comment_filter = Filter::new() | 500 | let comment_filter = Filter::new() |
| @@ -614,30 +545,18 @@ async fn test_layer3_sync_with_q_tag() { | |||
| 614 | 545 | ||
| 615 | let keys = Keys::generate(); | 546 | let keys = Keys::generate(); |
| 616 | 547 | ||
| 617 | // 2. Create and send repository announcement to both relays | 548 | // 2. Create and send repository announcement to both relays with git data |
| 618 | let repo_id = "test-repo-tag-9c"; | 549 | let repo_id = "test-repo-tag-9c"; |
| 619 | let announcement = | 550 | let domains = vec![relay_a.domain(), relay_b.domain()]; |
| 620 | create_repo_announcement(&keys, &[&relay_a.domain(), &relay_b.domain()], repo_id); | 551 | let domain_refs: Vec<&str> = domains.iter().map(|s| s.as_str()).collect(); |
| 621 | 552 | ||
| 622 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | 553 | let (_announcement, _git_dir_a) = |
| 623 | .await | 554 | setup_announcement_on_relay(&relay_a, &keys, &domain_refs, repo_id).await; |
| 624 | .expect("Failed to connect to relay_a"); | 555 | println!("Announcement set up on relay_a with git data"); |
| 625 | 556 | ||
| 626 | let client_b = TestClient::new(relay_b.url(), keys.clone()) | 557 | let (_announcement_b, _git_dir_b) = |
| 627 | .await | 558 | setup_announcement_on_relay(&relay_b, &keys, &domain_refs, repo_id).await; |
| 628 | .expect("Failed to connect to relay_b"); | 559 | println!("Announcement set up on relay_b with git data (triggers discovery)"); |
| 629 | |||
| 630 | client_a | ||
| 631 | .send_event(&announcement) | ||
| 632 | .await | ||
| 633 | .expect("Failed to send announcement to relay_a"); | ||
| 634 | println!("Announcement sent to relay_a"); | ||
| 635 | |||
| 636 | client_b | ||
| 637 | .send_event(&announcement) | ||
| 638 | .await | ||
| 639 | .expect("Failed to send announcement to relay_b"); | ||
| 640 | println!("Announcement sent to relay_b (triggers discovery)"); | ||
| 641 | 560 | ||
| 642 | // 3. Wait for discovery | 561 | // 3. Wait for discovery |
| 643 | tokio::time::sleep(Duration::from_secs(1)).await; | 562 | tokio::time::sleep(Duration::from_secs(1)).await; |
| @@ -648,6 +567,10 @@ async fn test_layer3_sync_with_q_tag() { | |||
| 648 | .expect("Failed to create issue"); | 567 | .expect("Failed to create issue"); |
| 649 | let issue_id = issue.id; | 568 | let issue_id = issue.id; |
| 650 | 569 | ||
| 570 | let client_a = TestClient::new(relay_a.url(), keys.clone()) | ||
| 571 | .await | ||
| 572 | .expect("Failed to connect to relay_a"); | ||
| 573 | |||
| 651 | client_a | 574 | client_a |
| 652 | .send_event(&issue) | 575 | .send_event(&issue) |
| 653 | .await | 576 | .await |
| @@ -662,11 +585,6 @@ async fn test_layer3_sync_with_q_tag() { | |||
| 662 | assert!(issue_synced, "Layer 2 issue should sync first"); | 585 | assert!(issue_synced, "Layer 2 issue should sync first"); |
| 663 | 586 | ||
| 664 | // Wait for Layer 3 subscriptions to be established | 587 | // Wait for Layer 3 subscriptions to be established |
| 665 | // After issue syncs, relay_b's SelfSubscriber needs time to: | ||
| 666 | // 1. Receive the synced issue via notify_event broadcast | ||
| 667 | // 2. Batch timer to tick (up to 200ms in tests) | ||
| 668 | // 3. Process batch and create Layer 3 filters | ||
| 669 | // 4. Subscribe to relay_a with Layer 3 filters | ||
| 670 | tokio::time::sleep(Duration::from_millis(500)).await; | 588 | tokio::time::sleep(Duration::from_millis(500)).await; |
| 671 | 589 | ||
| 672 | // 6. Create and send Layer 3 quote with 'q' tag (kind 1) | 590 | // 6. Create and send Layer 3 quote with 'q' tag (kind 1) |
| @@ -679,9 +597,6 @@ async fn test_layer3_sync_with_q_tag() { | |||
| 679 | quote_id, | 597 | quote_id, |
| 680 | quote.kind.as_u16() | 598 | quote.kind.as_u16() |
| 681 | ); | 599 | ); |
| 682 | for tag in quote.tags.iter() { | ||
| 683 | println!(" Tag: {:?}", tag.as_slice()); | ||
| 684 | } | ||
| 685 | 600 | ||
| 686 | client_a | 601 | client_a |
| 687 | .send_event("e) | 602 | .send_event("e) |
| @@ -690,7 +605,6 @@ async fn test_layer3_sync_with_q_tag() { | |||
| 690 | println!("Layer 3 quote {} sent to relay_a", quote_id); | 605 | println!("Layer 3 quote {} sent to relay_a", quote_id); |
| 691 | 606 | ||
| 692 | client_a.disconnect().await; | 607 | client_a.disconnect().await; |
| 693 | client_b.disconnect().await; | ||
| 694 | 608 | ||
| 695 | // 7. Wait and verify quote syncs to relay_b | 609 | // 7. Wait and verify quote syncs to relay_b |
| 696 | let quote_filter = Filter::new() | 610 | let quote_filter = Filter::new() |