upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs161
1 files changed, 97 insertions, 64 deletions
diff --git a/src/client.rs b/src/client.rs
index 7c81f70..d1001d0 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -282,89 +282,122 @@ impl Connect for Client {
282 Ok((relay_results, progress_reporter)) 282 Ok((relay_results, progress_reporter))
283 } 283 }
284 284
285 #[allow(clippy::too_many_lines)]
285 async fn fetch_all( 286 async fn fetch_all(
286 &self, 287 &self,
287 git_repo_path: &Path, 288 git_repo_path: &Path,
288 repo_coordinates: &HashSet<Coordinate>, 289 repo_coordinates: &HashSet<Coordinate>,
289 ) -> Result<FetchReport> { 290 ) -> Result<FetchReport> {
290 println!("fetching updates..."); 291 println!("fetching updates...");
291 let mut fallback_relays = HashSet::new(); 292 let fallback_relays = &self
292 for r in &self.fallback_relays { 293 .fallback_relays
293 if let Ok(url) = Url::parse(r) { 294 .iter()
294 fallback_relays.insert(url); 295 .filter_map(|r| Url::parse(r).ok())
295 } 296 .collect::<HashSet<Url>>();
296 }
297 let request =
298 create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?;
299 297
300 let progress_reporter = MultiProgress::new(); 298 let mut request =
299 create_relays_request(git_repo_path, repo_coordinates, fallback_relays.clone()).await?;
301 300
302 for relay in &request.relays { 301 let progress_reporter = MultiProgress::new();
303 self.client
304 .add_relay(relay.as_str())
305 .await
306 .context("cannot add relay")?;
307 }
308 302
309 let dim = Style::new().color256(247); 303 let mut processed_relays = HashSet::new();
310 304
311 let futures: Vec<_> = request 305 let mut relay_reports: Vec<Result<FetchReport>> = vec![];
312 .relays
313 .iter()
314 // don't look for events on blaster
315 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
316 .map(|r| FetchRequest {
317 selected_relay: Some(r.clone()),
318 ..request.clone()
319 })
320 .map(|request| async {
321 let relay_column_width = request.relay_column_width;
322 306
323 let relay_url = request 307 loop {
324 .selected_relay 308 for relay in &request.relays {
325 .clone() 309 self.client
326 .context("fetch_all_from_relay called without a relay")?; 310 .add_relay(relay.as_str())
311 .await
312 .context("cannot add relay")?;
313 }
327 314
328 let pb = if std::env::var("NGITTEST").is_err() { 315 let dim = Style::new().color256(247);
329 let pb = progress_reporter.add(
330 ProgressBar::new(1)
331 .with_prefix(format!("{: <relay_column_width$} connecting", &relay_url))
332 .with_style(pb_style()?),
333 );
334 pb.enable_steady_tick(Duration::from_millis(300));
335 Some(pb)
336 } else {
337 None
338 };
339 316
340 #[allow(clippy::large_futures)] 317 let futures: Vec<_> = request
341 match self.fetch_all_from_relay(git_repo_path, request, &pb).await { 318 .relays
342 Err(error) => { 319 .iter()
343 if let Some(pb) = pb { 320 // don't look for events on blaster
344 pb.set_style(pb_after_style(false)); 321 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
345 pb.set_prefix( 322 .map(|r| FetchRequest {
346 dim.apply_to(format!("{: <relay_column_width$}", &relay_url)) 323 selected_relay: Some(r.clone()),
324 ..request.clone()
325 })
326 .map(|request| async {
327 let relay_column_width = request.relay_column_width;
328
329 let relay_url = request
330 .selected_relay
331 .clone()
332 .context("fetch_all_from_relay called without a relay")?;
333
334 let pb = if std::env::var("NGITTEST").is_err() {
335 let pb = progress_reporter.add(
336 ProgressBar::new(1)
337 .with_prefix(
338 dim.apply_to(format!(
339 "{: <relay_column_width$} connecting",
340 &relay_url
341 ))
347 .to_string(), 342 .to_string(),
348 );
349 pb.finish_with_message(
350 console::style(
351 error.to_string().replace("relay pool error:", "error:"),
352 ) 343 )
353 .for_stderr() 344 .with_style(pb_style()?),
354 .red() 345 );
355 .to_string(), 346 pb.enable_steady_tick(Duration::from_millis(300));
356 ); 347 Some(pb)
348 } else {
349 None
350 };
351
352 #[allow(clippy::large_futures)]
353 match self.fetch_all_from_relay(git_repo_path, request, &pb).await {
354 Err(error) => {
355 if let Some(pb) = pb {
356 pb.set_style(pb_after_style(false));
357 pb.set_prefix(
358 dim.apply_to(format!("{: <relay_column_width$}", &relay_url))
359 .to_string(),
360 );
361 pb.finish_with_message(
362 console::style(
363 error.to_string().replace("relay pool error:", "error:"),
364 )
365 .for_stderr()
366 .red()
367 .to_string(),
368 );
369 }
370 Err(error)
357 } 371 }
358 Err(error) 372 Ok(res) => Ok(res),
359 } 373 }
360 Ok(res) => Ok(res), 374 })
361 } 375 .collect();
362 })
363 .collect();
364 376
365 let relay_reports: Vec<Result<FetchReport>> = 377 for report in stream::iter(futures)
366 stream::iter(futures).buffer_unordered(15).collect().await; 378 .buffer_unordered(15)
379 .collect::<Vec<Result<FetchReport>>>()
380 .await
381 {
382 relay_reports.push(report);
383 }
384
385 for relay in &request.relays {
386 processed_relays.insert(relay.clone());
387 }
367 388
389 if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await {
390 request.relays = repo_ref
391 .relays
392 .iter()
393 .filter_map(|r| Url::parse(r).ok())
394 .filter(|r| !processed_relays.contains(r))
395 .collect();
396 if request.relays.is_empty() {
397 break;
398 }
399 }
400 }
368 let report = consolidate_fetch_reports(relay_reports); 401 let report = consolidate_fetch_reports(relay_reports);
369 402
370 if report.to_string().is_empty() { 403 if report.to_string().is_empty() {