upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib/client.rs494
1 files changed, 340 insertions, 154 deletions
diff --git a/src/lib/client.rs b/src/lib/client.rs
index c08cd04..45bac7a 100644
--- a/src/lib/client.rs
+++ b/src/lib/client.rs
@@ -15,7 +15,10 @@ use std::{
15 fmt::{Display, Write}, 15 fmt::{Display, Write},
16 fs::create_dir_all, 16 fs::create_dir_all,
17 path::Path, 17 path::Path,
18 sync::{Arc, RwLock}, 18 sync::{
19 Arc, RwLock,
20 atomic::{AtomicU64, Ordering},
21 },
19 time::Duration, 22 time::Duration,
20}; 23};
21 24
@@ -191,15 +194,11 @@ impl Connect for Client {
191 194
192 if !relay.is_connected() { 195 if !relay.is_connected() {
193 #[allow(clippy::large_futures)] 196 #[allow(clippy::large_futures)]
194 let _ = relay 197 relay
195 .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) 198 .try_connect(std::time::Duration::from_secs(LONG_TIMEOUT))
196 .await; 199 .await?;
197 } 200 }
198 201
199 if !relay.is_connected() {
200 self.skip_relay_for_session(relay_url.clone(), "connection timeout".to_string());
201 bail!("connection timeout");
202 }
203 Ok(()) 202 Ok(())
204 } 203 }
205 204
@@ -278,68 +277,74 @@ impl Connect for Client {
278 277
279 let relays_map = self.client.relays().await; 278 let relays_map = self.client.relays().await;
280 279
280 // Static timeout for get_events_per_relay (no adaptive timeout here)
281 let static_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT));
282
281 let futures: Vec<_> = relays 283 let futures: Vec<_> = relays
282 .clone() 284 .clone()
283 .iter() 285 .iter()
284 // don't look for events on blaster 286 // don't look for events on blaster
285 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) 287 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
286 .map(|r| (relays_map.get(r).unwrap(), filters.clone())) 288 .map(|r| (relays_map.get(r).unwrap(), filters.clone()))
287 .map(|(relay, filters)| async { 289 .map(|(relay, filters)| {
288 let pb = if std::env::var("NGITTEST").is_err() { 290 let static_timeout_clone = static_timeout.clone();
289 let pb = progress_reporter.add( 291 let progress_reporter_clone = progress_reporter.clone();
290 ProgressBar::new(1) 292 async move {
291 .with_prefix(format!("{: <11}{}", "connecting", relay.url())) 293 let pb = if std::env::var("NGITTEST").is_err() {
292 .with_style(pb_style()?), 294 let pb = progress_reporter_clone.add(
293 ); 295 ProgressBar::new(1)
294 pb.enable_steady_tick(Duration::from_millis(300)); 296 .with_prefix(format!("{: <11}{}", "connecting", relay.url()))
295 Some(pb) 297 .with_style(pb_style(static_timeout_clone)?),
296 } else {
297 None
298 };
299 fn update_progress_bar_with_error(
300 relay_url: &RelayUrl,
301 pb: Option<ProgressBar>,
302 error: &anyhow::Error,
303 ) {
304 if let Some(pb) = pb {
305 pb.set_style(pb_after_style(false));
306 pb.set_prefix(format!("{: <11}{}", "error", relay_url));
307 pb.finish_with_message(
308 console::style(
309 error.to_string().replace("relay pool error:", "error:"),
310 )
311 .for_stderr()
312 .red()
313 .to_string(),
314 ); 298 );
315 } 299 pb.enable_steady_tick(Duration::from_millis(300));
316 } 300 Some(pb)
317 if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { 301 } else {
318 update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); 302 None
319 bail!("{reason}"); 303 };
320 } 304 fn update_progress_bar_with_error(
321 #[allow(clippy::large_futures)] 305 relay_url: &RelayUrl,
322 match get_events_of(relay, filters, &pb).await { 306 pb: Option<ProgressBar>,
323 Err(error) => { 307 error: &anyhow::Error,
324 // Check error for timeout/connection issues and add to skip list 308 ) {
325 if error.to_string().contains("connection timeout") { 309 if let Some(pb) = pb {
326 // Simple check, refine as needed 310 pb.set_style(pb_after_style(false));
327 self.skip_relay_for_session(relay.url().clone(), error.to_string()); 311 pb.set_prefix(format!("{: <11}{}", "error", relay_url));
312 pb.finish_with_message(
313 console::style(
314 error.to_string().replace("relay pool error:", "error:"),
315 )
316 .for_stderr()
317 .red()
318 .to_string(),
319 );
328 } 320 }
329 update_progress_bar_with_error(relay.url(), pb, &error);
330 Err(error)
331 } 321 }
332 Ok(res) => { 322 if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) {
333 if let Some(pb) = pb { 323 update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}"));
334 pb.set_style(pb_after_style(true)); 324 bail!("{reason}");
335 pb.set_prefix(format!( 325 }
336 "{: <11}{}", 326 #[allow(clippy::large_futures)]
337 format!("{} events", res.len()), 327 match get_events_of(relay, filters, &pb).await {
338 relay.url() 328 Err(error) => {
339 )); 329 // Check error for timeout/connection issues and add to skip list
340 pb.finish_with_message(""); 330 if error.to_string().contains("connection timeout") {
331 self.skip_relay_for_session(relay.url().clone(), error.to_string());
332 }
333 update_progress_bar_with_error(relay.url(), pb, &error);
334 Err(error)
335 }
336 Ok(res) => {
337 if let Some(pb) = pb {
338 pb.set_style(pb_after_style(true));
339 pb.set_prefix(format!(
340 "{: <11}{}",
341 format!("{} events", res.len()),
342 relay.url()
343 ));
344 pb.finish_with_message("");
345 }
346 Ok(res)
341 } 347 }
342 Ok(res)
343 } 348 }
344 } 349 }
345 }) 350 })
@@ -374,6 +379,14 @@ impl Connect for Client {
374 379
375 let progress_reporter = MultiProgress::new(); 380 let progress_reporter = MultiProgress::new();
376 381
382 // Track successful relays for adaptive timeout (switch to SHORT when
383 // SUCCESS_THRESHOLD succeed)
384 let success_count = Arc::new(AtomicU64::new(0));
385
386 // Track current timeout value for progress bar display (starts at LONG,
387 // switches to SHORT)
388 let current_timeout = Arc::new(AtomicU64::new(LONG_TIMEOUT));
389
377 let mut processed_relays = HashSet::new(); 390 let mut processed_relays = HashSet::new();
378 391
379 let mut relay_reports: Vec<Result<FetchReport>> = vec![]; 392 let mut relay_reports: Vec<Result<FetchReport>> = vec![];
@@ -403,7 +416,9 @@ impl Connect for Client {
403 .context("failed to add relay")?; 416 .context("failed to add relay")?;
404 } 417 }
405 418
406 let dim = Style::new().color256(247); 419 let success_count_for_loop = success_count.clone();
420 let current_timeout_for_loop = current_timeout.clone();
421 let total_relays = relays.len() as u64;
407 422
408 let futures: Vec<_> = relays 423 let futures: Vec<_> = relays
409 .iter() 424 .iter()
@@ -434,83 +449,144 @@ impl Connect for Client {
434 } 449 }
435 } 450 }
436 }) 451 })
437 .map(|request| async { 452 .map(|request| {
438 let relay_column_width = request.relay_column_width; 453 let success_count_clone = success_count_for_loop.clone();
439 454 let current_timeout_clone = current_timeout_for_loop.clone();
440 let relay_url = request 455 let progress_reporter_clone = progress_reporter.clone();
441 .selected_relay 456 let total_relays_clone = total_relays;
442 .clone() 457 async move {
443 .context("fetch_all_from_relay called without a relay")?; 458 let dim = Style::new().color256(247);
444 459 let relay_column_width = request.relay_column_width;
445 let pb = if std::env::var("NGITTEST").is_err() { 460
446 let pb = progress_reporter.add( 461 let relay_url = request
447 ProgressBar::new(1) 462 .selected_relay
448 .with_prefix( 463 .clone()
449 dim.apply_to(format!( 464 .context("fetch_all_from_relay called without a relay")?;
450 "{: <relay_column_width$} connecting", 465
451 &relay_url 466 let pb = if std::env::var("NGITTEST").is_err() {
452 )) 467 let pb = progress_reporter_clone.add(
453 .to_string(), 468 ProgressBar::new(1)
454 ) 469 .with_prefix(
455 .with_style(pb_style()?), 470 dim.apply_to(format!(
456 ); 471 "{: <relay_column_width$} connecting",
457 pb.enable_steady_tick(Duration::from_millis(300)); 472 &relay_url
458 Some(pb) 473 ))
459 } else { 474 .to_string(),
460 None 475 )
461 }; 476 .with_style(pb_style(current_timeout_clone.clone())?),
462 // do here
463
464 fn update_progress_bar_with_error(
465 relay_column_width: usize,
466 relay_url: &RelayUrl,
467 pb: Option<ProgressBar>,
468 error: &anyhow::Error,
469 ) {
470 if let Some(pb) = pb {
471 pb.set_style(pb_after_style(false));
472 pb.set_prefix(
473 Style::new()
474 .color256(247)
475 .apply_to(format!("{: <relay_column_width$}", &relay_url))
476 .to_string(),
477 ); 477 );
478 pb.finish_with_message( 478 pb.enable_steady_tick(Duration::from_millis(300));
479 console::style( 479 Some(pb)
480 error.to_string().replace("relay pool error:", "error:"), 480 } else {
481 ) 481 None
482 .for_stderr() 482 };
483 .red() 483
484 .to_string(), 484 fn update_progress_bar_with_error(
485 ); 485 relay_column_width: usize,
486 } 486 relay_url: &RelayUrl,
487 } 487 pb: Option<ProgressBar>,
488 if let Some(reason) = self.is_relay_skipped_for_session(&relay_url) { 488 error: &anyhow::Error,
489 update_progress_bar_with_error( 489 ) {
490 relay_column_width, 490 if let Some(pb) = pb {
491 &relay_url, 491 pb.set_style(pb_after_style(false));
492 pb, 492 pb.set_prefix(
493 &anyhow!("{reason}"), 493 Style::new()
494 ); 494 .color256(247)
495 bail!("{reason}"); 495 .apply_to(format!("{: <relay_column_width$}", &relay_url))
496 } 496 .to_string(),
497 #[allow(clippy::large_futures)] 497 );
498 match self.fetch_all_from_relay(git_repo_path, request, &pb).await { 498 pb.finish_with_message(
499 Err(error) => { 499 console::style(
500 // Check error for timeout/connection issues and add to skip list 500 error.to_string().replace("relay pool error:", "error:"),
501 if error.to_string().contains("connection timeout") { 501 )
502 // Simple check, refine as needed 502 .for_stderr()
503 self.skip_relay_for_session(relay_url.clone(), error.to_string()); 503 .red()
504 .to_string(),
505 );
504 } 506 }
507 }
508
509 if let Some(reason) = self.is_relay_skipped_for_session(&relay_url) {
505 update_progress_bar_with_error( 510 update_progress_bar_with_error(
506 relay_column_width, 511 relay_column_width,
507 &relay_url, 512 &relay_url,
508 pb, 513 pb,
509 &error, 514 &anyhow!("{reason}"),
510 ); 515 );
511 Err(error) 516 bail!("{reason}");
517 }
518
519 // Adaptive timeout using tokio::select!
520 // Start the fetch operation once and race it against an adaptive timeout
521 let pb_clone = pb.clone();
522 let fetch_future = self.fetch_all_from_relay(git_repo_path, request, &pb_clone);
523 tokio::pin!(fetch_future);
524
525 // Create an adaptive timeout that switches from long to short
526 // when SUCCESS_THRESHOLD of relays succeed
527 let timeout_future = async {
528 // Poll for timeout or SUCCESS_THRESHOLD success threshold
529 let check_interval = Duration::from_millis(100);
530 let long_timeout_end = tokio::time::Instant::now() + Duration::from_secs(LONG_TIMEOUT);
531
532 loop {
533 // Check if SUCCESS_THRESHOLD of relays have succeeded
534 let current_success_count = success_count_clone.load(Ordering::Relaxed);
535 let threshold = (total_relays_clone as f64 * SUCCESS_THRESHOLD).ceil() as u64;
536
537 if current_success_count >= threshold {
538 // SUCCESS_THRESHOLD reached, switch to short timeout
539 tokio::time::sleep(Duration::from_secs(short_timeout())).await;
540 return "short";
541 }
542
543 // Check if long timeout has expired
544 if tokio::time::Instant::now() >= long_timeout_end {
545 return "long";
546 }
547
548 // Sleep briefly before checking again
549 tokio::time::sleep(check_interval).await;
550 }
551 };
552
553 #[allow(clippy::large_futures)]
554 let result = tokio::select! {
555 result = &mut fetch_future => {
556 if result.is_ok() {
557 // Increment success count
558 let new_count = success_count_clone.fetch_add(1, Ordering::Relaxed) + 1;
559 let threshold = (total_relays_clone as f64 * SUCCESS_THRESHOLD).ceil() as u64;
560
561 // If we've reached SUCCESS_THRESHOLD, update timeout display
562 if new_count >= threshold {
563 current_timeout_clone.store(short_timeout(), Ordering::Relaxed);
564 }
565 }
566 result
567 }
568 timeout_type = timeout_future => {
569 Err(anyhow!("timeout after {}s timeout",
570 if timeout_type == "long" { LONG_TIMEOUT } else { short_timeout() }))
571 }
572 };
573
574 match result {
575 Err(error) => {
576 // Check error for timeout/connection issues and add to skip list
577 if error.to_string().contains("connection timeout") || error.to_string().contains("timeout after") {
578 self.skip_relay_for_session(relay_url.clone(), error.to_string());
579 }
580 update_progress_bar_with_error(
581 relay_column_width,
582 &relay_url,
583 pb,
584 &error,
585 );
586 Err(error)
587 }
588 Ok(res) => Ok(res),
512 } 589 }
513 Ok(res) => Ok(res),
514 } 590 }
515 }) 591 })
516 .collect(); 592 .collect();
@@ -586,7 +662,7 @@ impl Connect for Client {
586 662
587 let relay_column_width = request.relay_column_width; 663 let relay_column_width = request.relay_column_width;
588 664
589 self.connect(&relay_url).await?; 665 let _ = self.client.add_relay(&relay_url).await;
590 666
591 let dim = Style::new().color256(247); 667 let dim = Style::new().color256(247);
592 668
@@ -614,7 +690,7 @@ impl Connect for Client {
614 fresh_profiles = HashSet::new(); 690 fresh_profiles = HashSet::new();
615 691
616 let relay = self.client.relay(&relay_url).await?; 692 let relay = self.client.relay(&relay_url).await?;
617 let events: Vec<nostr::Event> = get_events_of(&relay, filters.clone(), &None).await?; 693 let events: Vec<nostr::Event> = get_events_of(&relay, filters.clone(), pb).await?;
618 // TODO: try reconcile 694 // TODO: try reconcile
619 695
620 process_fetched_events( 696 process_fetched_events(
@@ -655,8 +731,16 @@ impl Connect for Client {
655 } 731 }
656} 732}
657 733
658static CONNECTION_TIMEOUT: u64 = 3; 734static LONG_TIMEOUT: u64 = 45;
659static GET_EVENTS_TIMEOUT: u64 = 7; 735static SUCCESS_THRESHOLD: f64 = 0.5; // 50% of relays must succeed to switch to short timeout
736
737fn short_timeout() -> u64 {
738 if std::env::var("NGITTEST").is_ok() {
739 3
740 } else {
741 7
742 }
743}
660 744
661async fn get_events_of( 745async fn get_events_of(
662 relay: &nostr_sdk::Relay, 746 relay: &nostr_sdk::Relay,
@@ -665,25 +749,123 @@ async fn get_events_of(
665) -> Result<Vec<Event>> { 749) -> Result<Vec<Event>> {
666 // relay.reconcile(filter, opts).await?; 750 // relay.reconcile(filter, opts).await?;
667 751
668 if !relay.is_connected() { 752 let mut retry_delay = Duration::from_secs(2);
753 let start_time = std::time::Instant::now();
754 let max_timeout = Duration::from_secs(LONG_TIMEOUT);
755 let mut last_error = None;
756 let mut attempt_num = 0;
757 let dim = Style::new().color256(247);
758
759 if let Some(pb) = pb {
760 pb.set_prefix(
761 console::style(relay.url())
762 .for_stderr()
763 .yellow()
764 .to_string(),
765 );
766 pb.set_message("connecting");
767 }
768 while !relay.is_connected() {
769 attempt_num += 1;
669 #[allow(clippy::large_futures)] 770 #[allow(clippy::large_futures)]
670 let _ = relay 771 match relay
671 .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) 772 .try_connect(Duration::from_secs(short_timeout()))
672 .await; 773 .await
774 {
775 Ok(_) => {
776 if relay.is_connected() {
777 break;
778 }
779 }
780 Err(e) => {
781 last_error = Some(e);
782 }
783 }
784 // Check if we have time for another retry
785 if start_time.elapsed() + retry_delay >= max_timeout {
786 break;
787 }
788
789 // For short delays (< 2s), just show a simple message and sleep
790 // For longer delays, show a countdown to provide feedback
791 if retry_delay < Duration::from_secs(2) {
792 if let Some(pb) = pb {
793 let retry_msg = if attempt_num > 1 {
794 format!("retrying (attempt {attempt_num})")
795 } else {
796 "retrying".to_string()
797 };
798 pb.set_message(format!(
799 "{} {}",
800 console::style("connection failed").for_stderr().red(),
801 dim.apply_to(retry_msg)
802 ));
803 }
804 tokio::time::sleep(retry_delay).await;
805 } else {
806 // Countdown with dynamic updates for longer delays
807 let retry_start = std::time::Instant::now();
808 let mut interval = tokio::time::interval(Duration::from_millis(100));
809 interval.tick().await; // First tick completes immediately
810
811 loop {
812 let elapsed = retry_start.elapsed();
813 let remaining = retry_delay.saturating_sub(elapsed);
814
815 if let Some(pb) = pb {
816 let retry_msg = if attempt_num > 1 {
817 format!(
818 "retrying in {:.0}s (attempt {attempt_num})",
819 remaining.as_secs_f64()
820 )
821 } else {
822 format!("retrying in {:.0}s", remaining.as_secs_f64())
823 };
824 pb.set_message(format!(
825 "{} {}",
826 console::style("connection failed").for_stderr().red(),
827 dim.apply_to(retry_msg)
828 ));
829 }
830
831 if elapsed >= retry_delay {
832 break;
833 }
834
835 interval.tick().await;
836 }
837 }
838
839 // Check again after sleep
840 if start_time.elapsed() >= max_timeout {
841 break;
842 }
843
844 retry_delay = Duration::from_secs_f64(retry_delay.as_secs_f64() * 1.5);
673 } 845 }
674 846
675 if !relay.is_connected() { 847 if !relay.is_connected() {
676 bail!("connection timeout"); 848 if let Some(e) = last_error {
849 bail!("connection timeout: {}", e);
850 } else {
851 bail!("connection timeout here");
852 }
677 } else if let Some(pb) = pb { 853 } else if let Some(pb) = pb {
678 pb.set_prefix(format!("connected {}", relay.url())); 854 pb.set_prefix(
855 console::style(relay.url())
856 .for_stderr()
857 .yellow()
858 .to_string(),
859 );
860 pb.set_message("connected");
679 } 861 }
680 862
681 let events_res = join_all(filters.into_iter().map(|filter| async { 863 let events_res = join_all(filters.into_iter().map(|filter| async {
682 relay 864 relay
683 .fetch_events( 865 .fetch_events(
684 filter, 866 filter,
685 // 20 is nostr_sdk default 867 // Use a very long timeout; actual timeout is controlled by outer tokio::select!
686 std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), 868 std::time::Duration::from_secs(LONG_TIMEOUT),
687 ReqExitPolicy::ExitOnEOSE, 869 ReqExitPolicy::ExitOnEOSE,
688 ) 870 )
689 .await 871 .await
@@ -907,22 +1089,26 @@ pub async fn nip05_query(nip05_addr: &str) -> Result<Nip05Profile> {
907 )) 1089 ))
908} 1090}
909 1091
910fn pb_style() -> Result<ProgressStyle> { 1092fn pb_style(current_timeout: Arc<AtomicU64>) -> Result<ProgressStyle> {
911 Ok( 1093 Ok(
912 ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( 1094 ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key(
913 "timeout_in", 1095 "timeout_in",
914 |state: &ProgressState, w: &mut dyn Write| { 1096 move |state: &ProgressState, w: &mut dyn Write| {
915 if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { 1097 let elapsed = state.elapsed().as_secs();
1098 // Adaptive timeout display: reads the actual current timeout value
1099 // which starts at LONG_TIMEOUT and switches to SHORT_TIMEOUT after
1100 // the first relay succeeds
1101 if elapsed > 3 {
916 let dim = Style::new().color256(247); 1102 let dim = Style::new().color256(247);
917 write!( 1103 let timeout = current_timeout.load(Ordering::Relaxed);
918 w, 1104 if elapsed < timeout {
919 "{}", 1105 write!(
920 dim.apply_to(format!( 1106 w,
921 "timeout in {:.1}s", 1107 "{}",
922 GET_EVENTS_TIMEOUT - state.elapsed().as_secs() 1108 dim.apply_to(format!("timeout in {:.1}s", timeout - elapsed))
923 )) 1109 )
924 ) 1110 .unwrap();
925 .unwrap(); 1111 }
926 } 1112 }
927 }, 1113 },
928 ), 1114 ),