From d11793cec610c6448e092a1ba55e7b79a5adeefb Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Tue, 16 Jul 2024 13:04:35 +0100 Subject: feat(fetch): search newly discovered inbox relays to increase the likelihood that all events are found --- src/client.rs | 161 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 64 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index 7c81f70..d1001d0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -282,89 +282,122 @@ impl Connect for Client { Ok((relay_results, progress_reporter)) } + #[allow(clippy::too_many_lines)] async fn fetch_all( &self, git_repo_path: &Path, repo_coordinates: &HashSet, ) -> Result { println!("fetching updates..."); - let mut fallback_relays = HashSet::new(); - for r in &self.fallback_relays { - if let Ok(url) = Url::parse(r) { - fallback_relays.insert(url); - } - } - let request = - create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?; + let fallback_relays = &self + .fallback_relays + .iter() + .filter_map(|r| Url::parse(r).ok()) + .collect::>(); - let progress_reporter = MultiProgress::new(); + let mut request = + create_relays_request(git_repo_path, repo_coordinates, fallback_relays.clone()).await?; - for relay in &request.relays { - self.client - .add_relay(relay.as_str()) - .await - .context("cannot add relay")?; - } + let progress_reporter = MultiProgress::new(); - let dim = Style::new().color256(247); + let mut processed_relays = HashSet::new(); - let futures: Vec<_> = request - .relays - .iter() - // don't look for events on blaster - .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) - .map(|r| FetchRequest { - selected_relay: Some(r.clone()), - ..request.clone() - }) - .map(|request| async { - let relay_column_width = request.relay_column_width; + let mut relay_reports: Vec> = vec![]; - let relay_url = request - .selected_relay - .clone() - .context("fetch_all_from_relay called without a relay")?; + loop { + for relay in &request.relays { + self.client + .add_relay(relay.as_str()) + .await + .context("cannot add relay")?; + } - let pb = if std::env::var("NGITTEST").is_err() { - let pb = progress_reporter.add( - ProgressBar::new(1) - .with_prefix(format!("{: { - if let Some(pb) = pb { - pb.set_style(pb_after_style(false)); - pb.set_prefix( - dim.apply_to(format!("{: = request + .relays + .iter() + // don't look for events on blaster + .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) + .map(|r| FetchRequest { + selected_relay: Some(r.clone()), + ..request.clone() + }) + .map(|request| async { + let relay_column_width = request.relay_column_width; + + let relay_url = request + .selected_relay + .clone() + .context("fetch_all_from_relay called without a relay")?; + + let pb = if std::env::var("NGITTEST").is_err() { + let pb = progress_reporter.add( + ProgressBar::new(1) + .with_prefix( + dim.apply_to(format!( + "{: { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix( + dim.apply_to(format!("{: Ok(res), } - Ok(res) => Ok(res), - } - }) - .collect(); + }) + .collect(); - let relay_reports: Vec> = - stream::iter(futures).buffer_unordered(15).collect().await; + for report in stream::iter(futures) + .buffer_unordered(15) + .collect::>>() + .await + { + relay_reports.push(report); + } + + for relay in &request.relays { + processed_relays.insert(relay.clone()); + } + if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await { + request.relays = repo_ref + .relays + .iter() + .filter_map(|r| Url::parse(r).ok()) + .filter(|r| !processed_relays.contains(r)) + .collect(); + if request.relays.is_empty() { + break; + } + } + } let report = consolidate_fetch_reports(relay_reports); if report.to_string().is_empty() { -- cgit v1.2.3