From dcec4432587037300480aca15331237c50b2cc02 Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 10 Nov 2025 16:57:38 +0000 Subject: feat: adaptive relay timeout 45s or 7s based on success also auto-retry connection after 2s with x1.5 increment until timeout --- src/lib/client.rs | 494 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 340 insertions(+), 154 deletions(-) (limited to 'src') diff --git a/src/lib/client.rs b/src/lib/client.rs index c08cd04..45bac7a 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -15,7 +15,10 @@ use std::{ fmt::{Display, Write}, fs::create_dir_all, path::Path, - sync::{Arc, RwLock}, + sync::{ + Arc, RwLock, + atomic::{AtomicU64, Ordering}, + }, time::Duration, }; @@ -191,15 +194,11 @@ impl Connect for Client { if !relay.is_connected() { #[allow(clippy::large_futures)] - let _ = relay - .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) - .await; + relay + .try_connect(std::time::Duration::from_secs(LONG_TIMEOUT)) + .await?; } - if !relay.is_connected() { - self.skip_relay_for_session(relay_url.clone(), "connection timeout".to_string()); - bail!("connection timeout"); - } Ok(()) } @@ -278,68 +277,74 @@ impl Connect for Client { let relays_map = self.client.relays().await; + // Static timeout for get_events_per_relay (no adaptive timeout here) + let static_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT)); + let futures: Vec<_> = relays .clone() .iter() // don't look for events on blaster .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) .map(|r| (relays_map.get(r).unwrap(), filters.clone())) - .map(|(relay, filters)| async { - let pb = if std::env::var("NGITTEST").is_err() { - let pb = progress_reporter.add( - ProgressBar::new(1) - .with_prefix(format!("{: <11}{}", "connecting", relay.url())) - .with_style(pb_style()?), - ); - pb.enable_steady_tick(Duration::from_millis(300)); - Some(pb) - } else { - None - }; - fn update_progress_bar_with_error( - relay_url: &RelayUrl, - pb: Option, - error: &anyhow::Error, - ) { - if let Some(pb) = pb { - pb.set_style(pb_after_style(false)); - pb.set_prefix(format!("{: <11}{}", "error", relay_url)); - pb.finish_with_message( - console::style( - error.to_string().replace("relay pool error:", "error:"), - ) - .for_stderr() - .red() - .to_string(), + .map(|(relay, filters)| { + let static_timeout_clone = static_timeout.clone(); + let progress_reporter_clone = progress_reporter.clone(); + async move { + let pb = if std::env::var("NGITTEST").is_err() { + let pb = progress_reporter_clone.add( + ProgressBar::new(1) + .with_prefix(format!("{: <11}{}", "connecting", relay.url())) + .with_style(pb_style(static_timeout_clone)?), ); - } - } - if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { - update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); - bail!("{reason}"); - } - #[allow(clippy::large_futures)] - match get_events_of(relay, filters, &pb).await { - Err(error) => { - // Check error for timeout/connection issues and add to skip list - if error.to_string().contains("connection timeout") { - // Simple check, refine as needed - self.skip_relay_for_session(relay.url().clone(), error.to_string()); + pb.enable_steady_tick(Duration::from_millis(300)); + Some(pb) + } else { + None + }; + fn update_progress_bar_with_error( + relay_url: &RelayUrl, + pb: Option, + error: &anyhow::Error, + ) { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix(format!("{: <11}{}", "error", relay_url)); + pb.finish_with_message( + console::style( + error.to_string().replace("relay pool error:", "error:"), + ) + .for_stderr() + .red() + .to_string(), + ); } - update_progress_bar_with_error(relay.url(), pb, &error); - Err(error) } - Ok(res) => { - if let Some(pb) = pb { - pb.set_style(pb_after_style(true)); - pb.set_prefix(format!( - "{: <11}{}", - format!("{} events", res.len()), - relay.url() - )); - pb.finish_with_message(""); + if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { + update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); + bail!("{reason}"); + } + #[allow(clippy::large_futures)] + match get_events_of(relay, filters, &pb).await { + Err(error) => { + // Check error for timeout/connection issues and add to skip list + if error.to_string().contains("connection timeout") { + self.skip_relay_for_session(relay.url().clone(), error.to_string()); + } + update_progress_bar_with_error(relay.url(), pb, &error); + Err(error) + } + Ok(res) => { + if let Some(pb) = pb { + pb.set_style(pb_after_style(true)); + pb.set_prefix(format!( + "{: <11}{}", + format!("{} events", res.len()), + relay.url() + )); + pb.finish_with_message(""); + } + Ok(res) } - Ok(res) } } }) @@ -374,6 +379,14 @@ impl Connect for Client { let progress_reporter = MultiProgress::new(); + // Track successful relays for adaptive timeout (switch to SHORT when + // SUCCESS_THRESHOLD succeed) + let success_count = Arc::new(AtomicU64::new(0)); + + // Track current timeout value for progress bar display (starts at LONG, + // switches to SHORT) + let current_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT)); + let mut processed_relays = HashSet::new(); let mut relay_reports: Vec> = vec![]; @@ -403,7 +416,9 @@ impl Connect for Client { .context("failed to add relay")?; } - let dim = Style::new().color256(247); + let success_count_for_loop = success_count.clone(); + let current_timeout_for_loop = current_timeout.clone(); + let total_relays = relays.len() as u64; let futures: Vec<_> = relays .iter() @@ -434,83 +449,144 @@ impl Connect for Client { } } }) - .map(|request| async { - let relay_column_width = request.relay_column_width; - - let relay_url = request - .selected_relay - .clone() - .context("fetch_all_from_relay called without a relay")?; - - let pb = if std::env::var("NGITTEST").is_err() { - let pb = progress_reporter.add( - ProgressBar::new(1) - .with_prefix( - dim.apply_to(format!( - "{: , - error: &anyhow::Error, - ) { - if let Some(pb) = pb { - pb.set_style(pb_after_style(false)); - pb.set_prefix( - Style::new() - .color256(247) - .apply_to(format!("{: { - // Check error for timeout/connection issues and add to skip list - if error.to_string().contains("connection timeout") { - // Simple check, refine as needed - self.skip_relay_for_session(relay_url.clone(), error.to_string()); + pb.enable_steady_tick(Duration::from_millis(300)); + Some(pb) + } else { + None + }; + + fn update_progress_bar_with_error( + relay_column_width: usize, + relay_url: &RelayUrl, + pb: Option, + error: &anyhow::Error, + ) { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix( + Style::new() + .color256(247) + .apply_to(format!("{: = threshold { + // SUCCESS_THRESHOLD reached, switch to short timeout + tokio::time::sleep(Duration::from_secs(short_timeout())).await; + return "short"; + } + + // Check if long timeout has expired + if tokio::time::Instant::now() >= long_timeout_end { + return "long"; + } + + // Sleep briefly before checking again + tokio::time::sleep(check_interval).await; + } + }; + + #[allow(clippy::large_futures)] + let result = tokio::select! { + result = &mut fetch_future => { + if result.is_ok() { + // Increment success count + let new_count = success_count_clone.fetch_add(1, Ordering::Relaxed) + 1; + let threshold = (total_relays_clone as f64 * SUCCESS_THRESHOLD).ceil() as u64; + + // If we've reached SUCCESS_THRESHOLD, update timeout display + if new_count >= threshold { + current_timeout_clone.store(short_timeout(), Ordering::Relaxed); + } + } + result + } + timeout_type = timeout_future => { + Err(anyhow!("timeout after {}s timeout", + if timeout_type == "long" { LONG_TIMEOUT } else { short_timeout() })) + } + }; + + match result { + Err(error) => { + // Check error for timeout/connection issues and add to skip list + if error.to_string().contains("connection timeout") || error.to_string().contains("timeout after") { + self.skip_relay_for_session(relay_url.clone(), error.to_string()); + } + update_progress_bar_with_error( + relay_column_width, + &relay_url, + pb, + &error, + ); + Err(error) + } + Ok(res) => Ok(res), } - Ok(res) => Ok(res), } }) .collect(); @@ -586,7 +662,7 @@ impl Connect for Client { let relay_column_width = request.relay_column_width; - self.connect(&relay_url).await?; + let _ = self.client.add_relay(&relay_url).await; let dim = Style::new().color256(247); @@ -614,7 +690,7 @@ impl Connect for Client { fresh_profiles = HashSet::new(); let relay = self.client.relay(&relay_url).await?; - let events: Vec = get_events_of(&relay, filters.clone(), &None).await?; + let events: Vec = get_events_of(&relay, filters.clone(), pb).await?; // TODO: try reconcile process_fetched_events( @@ -655,8 +731,16 @@ impl Connect for Client { } } -static CONNECTION_TIMEOUT: u64 = 3; -static GET_EVENTS_TIMEOUT: u64 = 7; +static LONG_TIMEOUT: u64 = 45; +static SUCCESS_THRESHOLD: f64 = 0.5; // 50% of relays must succeed to switch to short timeout + +fn short_timeout() -> u64 { + if std::env::var("NGITTEST").is_ok() { + 3 + } else { + 7 + } +} async fn get_events_of( relay: &nostr_sdk::Relay, @@ -665,25 +749,123 @@ async fn get_events_of( ) -> Result> { // relay.reconcile(filter, opts).await?; - if !relay.is_connected() { + let mut retry_delay = Duration::from_secs(2); + let start_time = std::time::Instant::now(); + let max_timeout = Duration::from_secs(LONG_TIMEOUT); + let mut last_error = None; + let mut attempt_num = 0; + let dim = Style::new().color256(247); + + if let Some(pb) = pb { + pb.set_prefix( + console::style(relay.url()) + .for_stderr() + .yellow() + .to_string(), + ); + pb.set_message("connecting"); + } + while !relay.is_connected() { + attempt_num += 1; #[allow(clippy::large_futures)] - let _ = relay - .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) - .await; + match relay + .try_connect(Duration::from_secs(short_timeout())) + .await + { + Ok(_) => { + if relay.is_connected() { + break; + } + } + Err(e) => { + last_error = Some(e); + } + } + // Check if we have time for another retry + if start_time.elapsed() + retry_delay >= max_timeout { + break; + } + + // For short delays (< 2s), just show a simple message and sleep + // For longer delays, show a countdown to provide feedback + if retry_delay < Duration::from_secs(2) { + if let Some(pb) = pb { + let retry_msg = if attempt_num > 1 { + format!("retrying (attempt {attempt_num})") + } else { + "retrying".to_string() + }; + pb.set_message(format!( + "{} {}", + console::style("connection failed").for_stderr().red(), + dim.apply_to(retry_msg) + )); + } + tokio::time::sleep(retry_delay).await; + } else { + // Countdown with dynamic updates for longer delays + let retry_start = std::time::Instant::now(); + let mut interval = tokio::time::interval(Duration::from_millis(100)); + interval.tick().await; // First tick completes immediately + + loop { + let elapsed = retry_start.elapsed(); + let remaining = retry_delay.saturating_sub(elapsed); + + if let Some(pb) = pb { + let retry_msg = if attempt_num > 1 { + format!( + "retrying in {:.0}s (attempt {attempt_num})", + remaining.as_secs_f64() + ) + } else { + format!("retrying in {:.0}s", remaining.as_secs_f64()) + }; + pb.set_message(format!( + "{} {}", + console::style("connection failed").for_stderr().red(), + dim.apply_to(retry_msg) + )); + } + + if elapsed >= retry_delay { + break; + } + + interval.tick().await; + } + } + + // Check again after sleep + if start_time.elapsed() >= max_timeout { + break; + } + + retry_delay = Duration::from_secs_f64(retry_delay.as_secs_f64() * 1.5); } if !relay.is_connected() { - bail!("connection timeout"); + if let Some(e) = last_error { + bail!("connection timeout: {}", e); + } else { + bail!("connection timeout here"); + } } else if let Some(pb) = pb { - pb.set_prefix(format!("connected {}", relay.url())); + pb.set_prefix( + console::style(relay.url()) + .for_stderr() + .yellow() + .to_string(), + ); + pb.set_message("connected"); } let events_res = join_all(filters.into_iter().map(|filter| async { relay .fetch_events( filter, - // 20 is nostr_sdk default - std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), + // Use a very long timeout; actual timeout is controlled by outer tokio::select! + std::time::Duration::from_secs(LONG_TIMEOUT), ReqExitPolicy::ExitOnEOSE, ) .await @@ -907,22 +1089,26 @@ pub async fn nip05_query(nip05_addr: &str) -> Result { )) } -fn pb_style() -> Result { +fn pb_style(current_timeout: Arc) -> Result { Ok( ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( "timeout_in", - |state: &ProgressState, w: &mut dyn Write| { - if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { + move |state: &ProgressState, w: &mut dyn Write| { + let elapsed = state.elapsed().as_secs(); + // Adaptive timeout display: reads the actual current timeout value + // which starts at LONG_TIMEOUT and switches to SHORT_TIMEOUT after + // the first relay succeeds + if elapsed > 3 { let dim = Style::new().color256(247); - write!( - w, - "{}", - dim.apply_to(format!( - "timeout in {:.1}s", - GET_EVENTS_TIMEOUT - state.elapsed().as_secs() - )) - ) - .unwrap(); + let timeout = current_timeout.load(Ordering::Relaxed); + if elapsed < timeout { + write!( + w, + "{}", + dim.apply_to(format!("timeout in {:.1}s", timeout - elapsed)) + ) + .unwrap(); + } } }, ), -- cgit v1.2.3