diff options
Diffstat (limited to 'src/lib/client.rs')
| -rw-r--r-- | src/lib/client.rs | 494 |
1 files changed, 340 insertions, 154 deletions
diff --git a/src/lib/client.rs b/src/lib/client.rs index c08cd04..45bac7a 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs | |||
| @@ -15,7 +15,10 @@ use std::{ | |||
| 15 | fmt::{Display, Write}, | 15 | fmt::{Display, Write}, |
| 16 | fs::create_dir_all, | 16 | fs::create_dir_all, |
| 17 | path::Path, | 17 | path::Path, |
| 18 | sync::{Arc, RwLock}, | 18 | sync::{ |
| 19 | Arc, RwLock, | ||
| 20 | atomic::{AtomicU64, Ordering}, | ||
| 21 | }, | ||
| 19 | time::Duration, | 22 | time::Duration, |
| 20 | }; | 23 | }; |
| 21 | 24 | ||
| @@ -191,15 +194,11 @@ impl Connect for Client { | |||
| 191 | 194 | ||
| 192 | if !relay.is_connected() { | 195 | if !relay.is_connected() { |
| 193 | #[allow(clippy::large_futures)] | 196 | #[allow(clippy::large_futures)] |
| 194 | let _ = relay | 197 | relay |
| 195 | .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) | 198 | .try_connect(std::time::Duration::from_secs(LONG_TIMEOUT)) |
| 196 | .await; | 199 | .await?; |
| 197 | } | 200 | } |
| 198 | 201 | ||
| 199 | if !relay.is_connected() { | ||
| 200 | self.skip_relay_for_session(relay_url.clone(), "connection timeout".to_string()); | ||
| 201 | bail!("connection timeout"); | ||
| 202 | } | ||
| 203 | Ok(()) | 202 | Ok(()) |
| 204 | } | 203 | } |
| 205 | 204 | ||
| @@ -278,68 +277,74 @@ impl Connect for Client { | |||
| 278 | 277 | ||
| 279 | let relays_map = self.client.relays().await; | 278 | let relays_map = self.client.relays().await; |
| 280 | 279 | ||
| 280 | // Static timeout for get_events_per_relay (no adaptive timeout here) | ||
| 281 | let static_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT)); | ||
| 282 | |||
| 281 | let futures: Vec<_> = relays | 283 | let futures: Vec<_> = relays |
| 282 | .clone() | 284 | .clone() |
| 283 | .iter() | 285 | .iter() |
| 284 | // don't look for events on blaster | 286 | // don't look for events on blaster |
| 285 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) | 287 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) |
| 286 | .map(|r| (relays_map.get(r).unwrap(), filters.clone())) | 288 | .map(|r| (relays_map.get(r).unwrap(), filters.clone())) |
| 287 | .map(|(relay, filters)| async { | 289 | .map(|(relay, filters)| { |
| 288 | let pb = if std::env::var("NGITTEST").is_err() { | 290 | let static_timeout_clone = static_timeout.clone(); |
| 289 | let pb = progress_reporter.add( | 291 | let progress_reporter_clone = progress_reporter.clone(); |
| 290 | ProgressBar::new(1) | 292 | async move { |
| 291 | .with_prefix(format!("{: <11}{}", "connecting", relay.url())) | 293 | let pb = if std::env::var("NGITTEST").is_err() { |
| 292 | .with_style(pb_style()?), | 294 | let pb = progress_reporter_clone.add( |
| 293 | ); | 295 | ProgressBar::new(1) |
| 294 | pb.enable_steady_tick(Duration::from_millis(300)); | 296 | .with_prefix(format!("{: <11}{}", "connecting", relay.url())) |
| 295 | Some(pb) | 297 | .with_style(pb_style(static_timeout_clone)?), |
| 296 | } else { | ||
| 297 | None | ||
| 298 | }; | ||
| 299 | fn update_progress_bar_with_error( | ||
| 300 | relay_url: &RelayUrl, | ||
| 301 | pb: Option<ProgressBar>, | ||
| 302 | error: &anyhow::Error, | ||
| 303 | ) { | ||
| 304 | if let Some(pb) = pb { | ||
| 305 | pb.set_style(pb_after_style(false)); | ||
| 306 | pb.set_prefix(format!("{: <11}{}", "error", relay_url)); | ||
| 307 | pb.finish_with_message( | ||
| 308 | console::style( | ||
| 309 | error.to_string().replace("relay pool error:", "error:"), | ||
| 310 | ) | ||
| 311 | .for_stderr() | ||
| 312 | .red() | ||
| 313 | .to_string(), | ||
| 314 | ); | 298 | ); |
| 315 | } | 299 | pb.enable_steady_tick(Duration::from_millis(300)); |
| 316 | } | 300 | Some(pb) |
| 317 | if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { | 301 | } else { |
| 318 | update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); | 302 | None |
| 319 | bail!("{reason}"); | 303 | }; |
| 320 | } | 304 | fn update_progress_bar_with_error( |
| 321 | #[allow(clippy::large_futures)] | 305 | relay_url: &RelayUrl, |
| 322 | match get_events_of(relay, filters, &pb).await { | 306 | pb: Option<ProgressBar>, |
| 323 | Err(error) => { | 307 | error: &anyhow::Error, |
| 324 | // Check error for timeout/connection issues and add to skip list | 308 | ) { |
| 325 | if error.to_string().contains("connection timeout") { | 309 | if let Some(pb) = pb { |
| 326 | // Simple check, refine as needed | 310 | pb.set_style(pb_after_style(false)); |
| 327 | self.skip_relay_for_session(relay.url().clone(), error.to_string()); | 311 | pb.set_prefix(format!("{: <11}{}", "error", relay_url)); |
| 312 | pb.finish_with_message( | ||
| 313 | console::style( | ||
| 314 | error.to_string().replace("relay pool error:", "error:"), | ||
| 315 | ) | ||
| 316 | .for_stderr() | ||
| 317 | .red() | ||
| 318 | .to_string(), | ||
| 319 | ); | ||
| 328 | } | 320 | } |
| 329 | update_progress_bar_with_error(relay.url(), pb, &error); | ||
| 330 | Err(error) | ||
| 331 | } | 321 | } |
| 332 | Ok(res) => { | 322 | if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { |
| 333 | if let Some(pb) = pb { | 323 | update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); |
| 334 | pb.set_style(pb_after_style(true)); | 324 | bail!("{reason}"); |
| 335 | pb.set_prefix(format!( | 325 | } |
| 336 | "{: <11}{}", | 326 | #[allow(clippy::large_futures)] |
| 337 | format!("{} events", res.len()), | 327 | match get_events_of(relay, filters, &pb).await { |
| 338 | relay.url() | 328 | Err(error) => { |
| 339 | )); | 329 | // Check error for timeout/connection issues and add to skip list |
| 340 | pb.finish_with_message(""); | 330 | if error.to_string().contains("connection timeout") { |
| 331 | self.skip_relay_for_session(relay.url().clone(), error.to_string()); | ||
| 332 | } | ||
| 333 | update_progress_bar_with_error(relay.url(), pb, &error); | ||
| 334 | Err(error) | ||
| 335 | } | ||
| 336 | Ok(res) => { | ||
| 337 | if let Some(pb) = pb { | ||
| 338 | pb.set_style(pb_after_style(true)); | ||
| 339 | pb.set_prefix(format!( | ||
| 340 | "{: <11}{}", | ||
| 341 | format!("{} events", res.len()), | ||
| 342 | relay.url() | ||
| 343 | )); | ||
| 344 | pb.finish_with_message(""); | ||
| 345 | } | ||
| 346 | Ok(res) | ||
| 341 | } | 347 | } |
| 342 | Ok(res) | ||
| 343 | } | 348 | } |
| 344 | } | 349 | } |
| 345 | }) | 350 | }) |
| @@ -374,6 +379,14 @@ impl Connect for Client { | |||
| 374 | 379 | ||
| 375 | let progress_reporter = MultiProgress::new(); | 380 | let progress_reporter = MultiProgress::new(); |
| 376 | 381 | ||
| 382 | // Track successful relays for adaptive timeout (switch to SHORT when | ||
| 383 | // SUCCESS_THRESHOLD succeed) | ||
| 384 | let success_count = Arc::new(AtomicU64::new(0)); | ||
| 385 | |||
| 386 | // Track current timeout value for progress bar display (starts at LONG, | ||
| 387 | // switches to SHORT) | ||
| 388 | let current_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT)); | ||
| 389 | |||
| 377 | let mut processed_relays = HashSet::new(); | 390 | let mut processed_relays = HashSet::new(); |
| 378 | 391 | ||
| 379 | let mut relay_reports: Vec<Result<FetchReport>> = vec![]; | 392 | let mut relay_reports: Vec<Result<FetchReport>> = vec![]; |
| @@ -403,7 +416,9 @@ impl Connect for Client { | |||
| 403 | .context("failed to add relay")?; | 416 | .context("failed to add relay")?; |
| 404 | } | 417 | } |
| 405 | 418 | ||
| 406 | let dim = Style::new().color256(247); | 419 | let success_count_for_loop = success_count.clone(); |
| 420 | let current_timeout_for_loop = current_timeout.clone(); | ||
| 421 | let total_relays = relays.len() as u64; | ||
| 407 | 422 | ||
| 408 | let futures: Vec<_> = relays | 423 | let futures: Vec<_> = relays |
| 409 | .iter() | 424 | .iter() |
| @@ -434,83 +449,144 @@ impl Connect for Client { | |||
| 434 | } | 449 | } |
| 435 | } | 450 | } |
| 436 | }) | 451 | }) |
| 437 | .map(|request| async { | 452 | .map(|request| { |
| 438 | let relay_column_width = request.relay_column_width; | 453 | let success_count_clone = success_count_for_loop.clone(); |
| 439 | 454 | let current_timeout_clone = current_timeout_for_loop.clone(); | |
| 440 | let relay_url = request | 455 | let progress_reporter_clone = progress_reporter.clone(); |
| 441 | .selected_relay | 456 | let total_relays_clone = total_relays; |
| 442 | .clone() | 457 | async move { |
| 443 | .context("fetch_all_from_relay called without a relay")?; | 458 | let dim = Style::new().color256(247); |
| 444 | 459 | let relay_column_width = request.relay_column_width; | |
| 445 | let pb = if std::env::var("NGITTEST").is_err() { | 460 | |
| 446 | let pb = progress_reporter.add( | 461 | let relay_url = request |
| 447 | ProgressBar::new(1) | 462 | .selected_relay |
| 448 | .with_prefix( | 463 | .clone() |
| 449 | dim.apply_to(format!( | 464 | .context("fetch_all_from_relay called without a relay")?; |
| 450 | "{: <relay_column_width$} connecting", | 465 | |
| 451 | &relay_url | 466 | let pb = if std::env::var("NGITTEST").is_err() { |
| 452 | )) | 467 | let pb = progress_reporter_clone.add( |
| 453 | .to_string(), | 468 | ProgressBar::new(1) |
| 454 | ) | 469 | .with_prefix( |
| 455 | .with_style(pb_style()?), | 470 | dim.apply_to(format!( |
| 456 | ); | 471 | "{: <relay_column_width$} connecting", |
| 457 | pb.enable_steady_tick(Duration::from_millis(300)); | 472 | &relay_url |
| 458 | Some(pb) | 473 | )) |
| 459 | } else { | 474 | .to_string(), |
| 460 | None | 475 | ) |
| 461 | }; | 476 | .with_style(pb_style(current_timeout_clone.clone())?), |
| 462 | // do here | ||
| 463 | |||
| 464 | fn update_progress_bar_with_error( | ||
| 465 | relay_column_width: usize, | ||
| 466 | relay_url: &RelayUrl, | ||
| 467 | pb: Option<ProgressBar>, | ||
| 468 | error: &anyhow::Error, | ||
| 469 | ) { | ||
| 470 | if let Some(pb) = pb { | ||
| 471 | pb.set_style(pb_after_style(false)); | ||
| 472 | pb.set_prefix( | ||
| 473 | Style::new() | ||
| 474 | .color256(247) | ||
| 475 | .apply_to(format!("{: <relay_column_width$}", &relay_url)) | ||
| 476 | .to_string(), | ||
| 477 | ); | 477 | ); |
| 478 | pb.finish_with_message( | 478 | pb.enable_steady_tick(Duration::from_millis(300)); |
| 479 | console::style( | 479 | Some(pb) |
| 480 | error.to_string().replace("relay pool error:", "error:"), | 480 | } else { |
| 481 | ) | 481 | None |
| 482 | .for_stderr() | 482 | }; |
| 483 | .red() | 483 | |
| 484 | .to_string(), | 484 | fn update_progress_bar_with_error( |
| 485 | ); | 485 | relay_column_width: usize, |
| 486 | } | 486 | relay_url: &RelayUrl, |
| 487 | } | 487 | pb: Option<ProgressBar>, |
| 488 | if let Some(reason) = self.is_relay_skipped_for_session(&relay_url) { | 488 | error: &anyhow::Error, |
| 489 | update_progress_bar_with_error( | 489 | ) { |
| 490 | relay_column_width, | 490 | if let Some(pb) = pb { |
| 491 | &relay_url, | 491 | pb.set_style(pb_after_style(false)); |
| 492 | pb, | 492 | pb.set_prefix( |
| 493 | &anyhow!("{reason}"), | 493 | Style::new() |
| 494 | ); | 494 | .color256(247) |
| 495 | bail!("{reason}"); | 495 | .apply_to(format!("{: <relay_column_width$}", &relay_url)) |
| 496 | } | 496 | .to_string(), |
| 497 | #[allow(clippy::large_futures)] | 497 | ); |
| 498 | match self.fetch_all_from_relay(git_repo_path, request, &pb).await { | 498 | pb.finish_with_message( |
| 499 | Err(error) => { | 499 | console::style( |
| 500 | // Check error for timeout/connection issues and add to skip list | 500 | error.to_string().replace("relay pool error:", "error:"), |
| 501 | if error.to_string().contains("connection timeout") { | 501 | ) |
| 502 | // Simple check, refine as needed | 502 | .for_stderr() |
| 503 | self.skip_relay_for_session(relay_url.clone(), error.to_string()); | 503 | .red() |
| 504 | .to_string(), | ||
| 505 | ); | ||
| 504 | } | 506 | } |
| 507 | } | ||
| 508 | |||
| 509 | if let Some(reason) = self.is_relay_skipped_for_session(&relay_url) { | ||
| 505 | update_progress_bar_with_error( | 510 | update_progress_bar_with_error( |
| 506 | relay_column_width, | 511 | relay_column_width, |
| 507 | &relay_url, | 512 | &relay_url, |
| 508 | pb, | 513 | pb, |
| 509 | &error, | 514 | &anyhow!("{reason}"), |
| 510 | ); | 515 | ); |
| 511 | Err(error) | 516 | bail!("{reason}"); |
| 517 | } | ||
| 518 | |||
| 519 | // Adaptive timeout using tokio::select! | ||
| 520 | // Start the fetch operation once and race it against an adaptive timeout | ||
| 521 | let pb_clone = pb.clone(); | ||
| 522 | let fetch_future = self.fetch_all_from_relay(git_repo_path, request, &pb_clone); | ||
| 523 | tokio::pin!(fetch_future); | ||
| 524 | |||
| 525 | // Create an adaptive timeout that switches from long to short | ||
| 526 | // when SUCCESS_THRESHOLD of relays succeed | ||
| 527 | let timeout_future = async { | ||
| 528 | // Poll for timeout or SUCCESS_THRESHOLD success threshold | ||
| 529 | let check_interval = Duration::from_millis(100); | ||
| 530 | let long_timeout_end = tokio::time::Instant::now() + Duration::from_secs(LONG_TIMEOUT); | ||
| 531 | |||
| 532 | loop { | ||
| 533 | // Check if SUCCESS_THRESHOLD of relays have succeeded | ||
| 534 | let current_success_count = success_count_clone.load(Ordering::Relaxed); | ||
| 535 | let threshold = (total_relays_clone as f64 * SUCCESS_THRESHOLD).ceil() as u64; | ||
| 536 | |||
| 537 | if current_success_count >= threshold { | ||
| 538 | // SUCCESS_THRESHOLD reached, switch to short timeout | ||
| 539 | tokio::time::sleep(Duration::from_secs(short_timeout())).await; | ||
| 540 | return "short"; | ||
| 541 | } | ||
| 542 | |||
| 543 | // Check if long timeout has expired | ||
| 544 | if tokio::time::Instant::now() >= long_timeout_end { | ||
| 545 | return "long"; | ||
| 546 | } | ||
| 547 | |||
| 548 | // Sleep briefly before checking again | ||
| 549 | tokio::time::sleep(check_interval).await; | ||
| 550 | } | ||
| 551 | }; | ||
| 552 | |||
| 553 | #[allow(clippy::large_futures)] | ||
| 554 | let result = tokio::select! { | ||
| 555 | result = &mut fetch_future => { | ||
| 556 | if result.is_ok() { | ||
| 557 | // Increment success count | ||
| 558 | let new_count = success_count_clone.fetch_add(1, Ordering::Relaxed) + 1; | ||
| 559 | let threshold = (total_relays_clone as f64 * SUCCESS_THRESHOLD).ceil() as u64; | ||
| 560 | |||
| 561 | // If we've reached SUCCESS_THRESHOLD, update timeout display | ||
| 562 | if new_count >= threshold { | ||
| 563 | current_timeout_clone.store(short_timeout(), Ordering::Relaxed); | ||
| 564 | } | ||
| 565 | } | ||
| 566 | result | ||
| 567 | } | ||
| 568 | timeout_type = timeout_future => { | ||
| 569 | Err(anyhow!("timeout after {}s timeout", | ||
| 570 | if timeout_type == "long" { LONG_TIMEOUT } else { short_timeout() })) | ||
| 571 | } | ||
| 572 | }; | ||
| 573 | |||
| 574 | match result { | ||
| 575 | Err(error) => { | ||
| 576 | // Check error for timeout/connection issues and add to skip list | ||
| 577 | if error.to_string().contains("connection timeout") || error.to_string().contains("timeout after") { | ||
| 578 | self.skip_relay_for_session(relay_url.clone(), error.to_string()); | ||
| 579 | } | ||
| 580 | update_progress_bar_with_error( | ||
| 581 | relay_column_width, | ||
| 582 | &relay_url, | ||
| 583 | pb, | ||
| 584 | &error, | ||
| 585 | ); | ||
| 586 | Err(error) | ||
| 587 | } | ||
| 588 | Ok(res) => Ok(res), | ||
| 512 | } | 589 | } |
| 513 | Ok(res) => Ok(res), | ||
| 514 | } | 590 | } |
| 515 | }) | 591 | }) |
| 516 | .collect(); | 592 | .collect(); |
| @@ -586,7 +662,7 @@ impl Connect for Client { | |||
| 586 | 662 | ||
| 587 | let relay_column_width = request.relay_column_width; | 663 | let relay_column_width = request.relay_column_width; |
| 588 | 664 | ||
| 589 | self.connect(&relay_url).await?; | 665 | let _ = self.client.add_relay(&relay_url).await; |
| 590 | 666 | ||
| 591 | let dim = Style::new().color256(247); | 667 | let dim = Style::new().color256(247); |
| 592 | 668 | ||
| @@ -614,7 +690,7 @@ impl Connect for Client { | |||
| 614 | fresh_profiles = HashSet::new(); | 690 | fresh_profiles = HashSet::new(); |
| 615 | 691 | ||
| 616 | let relay = self.client.relay(&relay_url).await?; | 692 | let relay = self.client.relay(&relay_url).await?; |
| 617 | let events: Vec<nostr::Event> = get_events_of(&relay, filters.clone(), &None).await?; | 693 | let events: Vec<nostr::Event> = get_events_of(&relay, filters.clone(), pb).await?; |
| 618 | // TODO: try reconcile | 694 | // TODO: try reconcile |
| 619 | 695 | ||
| 620 | process_fetched_events( | 696 | process_fetched_events( |
| @@ -655,8 +731,16 @@ impl Connect for Client { | |||
| 655 | } | 731 | } |
| 656 | } | 732 | } |
| 657 | 733 | ||
| 658 | static CONNECTION_TIMEOUT: u64 = 3; | 734 | static LONG_TIMEOUT: u64 = 45; |
| 659 | static GET_EVENTS_TIMEOUT: u64 = 7; | 735 | static SUCCESS_THRESHOLD: f64 = 0.5; // 50% of relays must succeed to switch to short timeout |
| 736 | |||
| 737 | fn short_timeout() -> u64 { | ||
| 738 | if std::env::var("NGITTEST").is_ok() { | ||
| 739 | 3 | ||
| 740 | } else { | ||
| 741 | 7 | ||
| 742 | } | ||
| 743 | } | ||
| 660 | 744 | ||
| 661 | async fn get_events_of( | 745 | async fn get_events_of( |
| 662 | relay: &nostr_sdk::Relay, | 746 | relay: &nostr_sdk::Relay, |
| @@ -665,25 +749,123 @@ async fn get_events_of( | |||
| 665 | ) -> Result<Vec<Event>> { | 749 | ) -> Result<Vec<Event>> { |
| 666 | // relay.reconcile(filter, opts).await?; | 750 | // relay.reconcile(filter, opts).await?; |
| 667 | 751 | ||
| 668 | if !relay.is_connected() { | 752 | let mut retry_delay = Duration::from_secs(2); |
| 753 | let start_time = std::time::Instant::now(); | ||
| 754 | let max_timeout = Duration::from_secs(LONG_TIMEOUT); | ||
| 755 | let mut last_error = None; | ||
| 756 | let mut attempt_num = 0; | ||
| 757 | let dim = Style::new().color256(247); | ||
| 758 | |||
| 759 | if let Some(pb) = pb { | ||
| 760 | pb.set_prefix( | ||
| 761 | console::style(relay.url()) | ||
| 762 | .for_stderr() | ||
| 763 | .yellow() | ||
| 764 | .to_string(), | ||
| 765 | ); | ||
| 766 | pb.set_message("connecting"); | ||
| 767 | } | ||
| 768 | while !relay.is_connected() { | ||
| 769 | attempt_num += 1; | ||
| 669 | #[allow(clippy::large_futures)] | 770 | #[allow(clippy::large_futures)] |
| 670 | let _ = relay | 771 | match relay |
| 671 | .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) | 772 | .try_connect(Duration::from_secs(short_timeout())) |
| 672 | .await; | 773 | .await |
| 774 | { | ||
| 775 | Ok(_) => { | ||
| 776 | if relay.is_connected() { | ||
| 777 | break; | ||
| 778 | } | ||
| 779 | } | ||
| 780 | Err(e) => { | ||
| 781 | last_error = Some(e); | ||
| 782 | } | ||
| 783 | } | ||
| 784 | // Check if we have time for another retry | ||
| 785 | if start_time.elapsed() + retry_delay >= max_timeout { | ||
| 786 | break; | ||
| 787 | } | ||
| 788 | |||
| 789 | // For short delays (< 2s), just show a simple message and sleep | ||
| 790 | // For longer delays, show a countdown to provide feedback | ||
| 791 | if retry_delay < Duration::from_secs(2) { | ||
| 792 | if let Some(pb) = pb { | ||
| 793 | let retry_msg = if attempt_num > 1 { | ||
| 794 | format!("retrying (attempt {attempt_num})") | ||
| 795 | } else { | ||
| 796 | "retrying".to_string() | ||
| 797 | }; | ||
| 798 | pb.set_message(format!( | ||
| 799 | "{} {}", | ||
| 800 | console::style("connection failed").for_stderr().red(), | ||
| 801 | dim.apply_to(retry_msg) | ||
| 802 | )); | ||
| 803 | } | ||
| 804 | tokio::time::sleep(retry_delay).await; | ||
| 805 | } else { | ||
| 806 | // Countdown with dynamic updates for longer delays | ||
| 807 | let retry_start = std::time::Instant::now(); | ||
| 808 | let mut interval = tokio::time::interval(Duration::from_millis(100)); | ||
| 809 | interval.tick().await; // First tick completes immediately | ||
| 810 | |||
| 811 | loop { | ||
| 812 | let elapsed = retry_start.elapsed(); | ||
| 813 | let remaining = retry_delay.saturating_sub(elapsed); | ||
| 814 | |||
| 815 | if let Some(pb) = pb { | ||
| 816 | let retry_msg = if attempt_num > 1 { | ||
| 817 | format!( | ||
| 818 | "retrying in {:.0}s (attempt {attempt_num})", | ||
| 819 | remaining.as_secs_f64() | ||
| 820 | ) | ||
| 821 | } else { | ||
| 822 | format!("retrying in {:.0}s", remaining.as_secs_f64()) | ||
| 823 | }; | ||
| 824 | pb.set_message(format!( | ||
| 825 | "{} {}", | ||
| 826 | console::style("connection failed").for_stderr().red(), | ||
| 827 | dim.apply_to(retry_msg) | ||
| 828 | )); | ||
| 829 | } | ||
| 830 | |||
| 831 | if elapsed >= retry_delay { | ||
| 832 | break; | ||
| 833 | } | ||
| 834 | |||
| 835 | interval.tick().await; | ||
| 836 | } | ||
| 837 | } | ||
| 838 | |||
| 839 | // Check again after sleep | ||
| 840 | if start_time.elapsed() >= max_timeout { | ||
| 841 | break; | ||
| 842 | } | ||
| 843 | |||
| 844 | retry_delay = Duration::from_secs_f64(retry_delay.as_secs_f64() * 1.5); | ||
| 673 | } | 845 | } |
| 674 | 846 | ||
| 675 | if !relay.is_connected() { | 847 | if !relay.is_connected() { |
| 676 | bail!("connection timeout"); | 848 | if let Some(e) = last_error { |
| 849 | bail!("connection timeout: {}", e); | ||
| 850 | } else { | ||
| 851 | bail!("connection timeout here"); | ||
| 852 | } | ||
| 677 | } else if let Some(pb) = pb { | 853 | } else if let Some(pb) = pb { |
| 678 | pb.set_prefix(format!("connected {}", relay.url())); | 854 | pb.set_prefix( |
| 855 | console::style(relay.url()) | ||
| 856 | .for_stderr() | ||
| 857 | .yellow() | ||
| 858 | .to_string(), | ||
| 859 | ); | ||
| 860 | pb.set_message("connected"); | ||
| 679 | } | 861 | } |
| 680 | 862 | ||
| 681 | let events_res = join_all(filters.into_iter().map(|filter| async { | 863 | let events_res = join_all(filters.into_iter().map(|filter| async { |
| 682 | relay | 864 | relay |
| 683 | .fetch_events( | 865 | .fetch_events( |
| 684 | filter, | 866 | filter, |
| 685 | // 20 is nostr_sdk default | 867 | // Use a very long timeout; actual timeout is controlled by outer tokio::select! |
| 686 | std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), | 868 | std::time::Duration::from_secs(LONG_TIMEOUT), |
| 687 | ReqExitPolicy::ExitOnEOSE, | 869 | ReqExitPolicy::ExitOnEOSE, |
| 688 | ) | 870 | ) |
| 689 | .await | 871 | .await |
| @@ -907,22 +1089,26 @@ pub async fn nip05_query(nip05_addr: &str) -> Result<Nip05Profile> { | |||
| 907 | )) | 1089 | )) |
| 908 | } | 1090 | } |
| 909 | 1091 | ||
| 910 | fn pb_style() -> Result<ProgressStyle> { | 1092 | fn pb_style(current_timeout: Arc<AtomicU64>) -> Result<ProgressStyle> { |
| 911 | Ok( | 1093 | Ok( |
| 912 | ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( | 1094 | ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( |
| 913 | "timeout_in", | 1095 | "timeout_in", |
| 914 | |state: &ProgressState, w: &mut dyn Write| { | 1096 | move |state: &ProgressState, w: &mut dyn Write| { |
| 915 | if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { | 1097 | let elapsed = state.elapsed().as_secs(); |
| 1098 | // Adaptive timeout display: reads the actual current timeout value | ||
| 1099 | // which starts at LONG_TIMEOUT and switches to SHORT_TIMEOUT after | ||
| 1100 | // the first relay succeeds | ||
| 1101 | if elapsed > 3 { | ||
| 916 | let dim = Style::new().color256(247); | 1102 | let dim = Style::new().color256(247); |
| 917 | write!( | 1103 | let timeout = current_timeout.load(Ordering::Relaxed); |
| 918 | w, | 1104 | if elapsed < timeout { |
| 919 | "{}", | 1105 | write!( |
| 920 | dim.apply_to(format!( | 1106 | w, |
| 921 | "timeout in {:.1}s", | 1107 | "{}", |
| 922 | GET_EVENTS_TIMEOUT - state.elapsed().as_secs() | 1108 | dim.apply_to(format!("timeout in {:.1}s", timeout - elapsed)) |
| 923 | )) | 1109 | ) |
| 924 | ) | 1110 | .unwrap(); |
| 925 | .unwrap(); | 1111 | } |
| 926 | } | 1112 | } |
| 927 | }, | 1113 | }, |
| 928 | ), | 1114 | ), |