diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/client.rs | 161 |
1 files changed, 97 insertions, 64 deletions
diff --git a/src/client.rs b/src/client.rs index 7c81f70..d1001d0 100644 --- a/src/client.rs +++ b/src/client.rs | |||
| @@ -282,89 +282,122 @@ impl Connect for Client { | |||
| 282 | Ok((relay_results, progress_reporter)) | 282 | Ok((relay_results, progress_reporter)) |
| 283 | } | 283 | } |
| 284 | 284 | ||
| 285 | #[allow(clippy::too_many_lines)] | ||
| 285 | async fn fetch_all( | 286 | async fn fetch_all( |
| 286 | &self, | 287 | &self, |
| 287 | git_repo_path: &Path, | 288 | git_repo_path: &Path, |
| 288 | repo_coordinates: &HashSet<Coordinate>, | 289 | repo_coordinates: &HashSet<Coordinate>, |
| 289 | ) -> Result<FetchReport> { | 290 | ) -> Result<FetchReport> { |
| 290 | println!("fetching updates..."); | 291 | println!("fetching updates..."); |
| 291 | let mut fallback_relays = HashSet::new(); | 292 | let fallback_relays = &self |
| 292 | for r in &self.fallback_relays { | 293 | .fallback_relays |
| 293 | if let Ok(url) = Url::parse(r) { | 294 | .iter() |
| 294 | fallback_relays.insert(url); | 295 | .filter_map(|r| Url::parse(r).ok()) |
| 295 | } | 296 | .collect::<HashSet<Url>>(); |
| 296 | } | ||
| 297 | let request = | ||
| 298 | create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?; | ||
| 299 | 297 | ||
| 300 | let progress_reporter = MultiProgress::new(); | 298 | let mut request = |
| 299 | create_relays_request(git_repo_path, repo_coordinates, fallback_relays.clone()).await?; | ||
| 301 | 300 | ||
| 302 | for relay in &request.relays { | 301 | let progress_reporter = MultiProgress::new(); |
| 303 | self.client | ||
| 304 | .add_relay(relay.as_str()) | ||
| 305 | .await | ||
| 306 | .context("cannot add relay")?; | ||
| 307 | } | ||
| 308 | 302 | ||
| 309 | let dim = Style::new().color256(247); | 303 | let mut processed_relays = HashSet::new(); |
| 310 | 304 | ||
| 311 | let futures: Vec<_> = request | 305 | let mut relay_reports: Vec<Result<FetchReport>> = vec![]; |
| 312 | .relays | ||
| 313 | .iter() | ||
| 314 | // don't look for events on blaster | ||
| 315 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) | ||
| 316 | .map(|r| FetchRequest { | ||
| 317 | selected_relay: Some(r.clone()), | ||
| 318 | ..request.clone() | ||
| 319 | }) | ||
| 320 | .map(|request| async { | ||
| 321 | let relay_column_width = request.relay_column_width; | ||
| 322 | 306 | ||
| 323 | let relay_url = request | 307 | loop { |
| 324 | .selected_relay | 308 | for relay in &request.relays { |
| 325 | .clone() | 309 | self.client |
| 326 | .context("fetch_all_from_relay called without a relay")?; | 310 | .add_relay(relay.as_str()) |
| 311 | .await | ||
| 312 | .context("cannot add relay")?; | ||
| 313 | } | ||
| 327 | 314 | ||
| 328 | let pb = if std::env::var("NGITTEST").is_err() { | 315 | let dim = Style::new().color256(247); |
| 329 | let pb = progress_reporter.add( | ||
| 330 | ProgressBar::new(1) | ||
| 331 | .with_prefix(format!("{: <relay_column_width$} connecting", &relay_url)) | ||
| 332 | .with_style(pb_style()?), | ||
| 333 | ); | ||
| 334 | pb.enable_steady_tick(Duration::from_millis(300)); | ||
| 335 | Some(pb) | ||
| 336 | } else { | ||
| 337 | None | ||
| 338 | }; | ||
| 339 | 316 | ||
| 340 | #[allow(clippy::large_futures)] | 317 | let futures: Vec<_> = request |
| 341 | match self.fetch_all_from_relay(git_repo_path, request, &pb).await { | 318 | .relays |
| 342 | Err(error) => { | 319 | .iter() |
| 343 | if let Some(pb) = pb { | 320 | // don't look for events on blaster |
| 344 | pb.set_style(pb_after_style(false)); | 321 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) |
| 345 | pb.set_prefix( | 322 | .map(|r| FetchRequest { |
| 346 | dim.apply_to(format!("{: <relay_column_width$}", &relay_url)) | 323 | selected_relay: Some(r.clone()), |
| 324 | ..request.clone() | ||
| 325 | }) | ||
| 326 | .map(|request| async { | ||
| 327 | let relay_column_width = request.relay_column_width; | ||
| 328 | |||
| 329 | let relay_url = request | ||
| 330 | .selected_relay | ||
| 331 | .clone() | ||
| 332 | .context("fetch_all_from_relay called without a relay")?; | ||
| 333 | |||
| 334 | let pb = if std::env::var("NGITTEST").is_err() { | ||
| 335 | let pb = progress_reporter.add( | ||
| 336 | ProgressBar::new(1) | ||
| 337 | .with_prefix( | ||
| 338 | dim.apply_to(format!( | ||
| 339 | "{: <relay_column_width$} connecting", | ||
| 340 | &relay_url | ||
| 341 | )) | ||
| 347 | .to_string(), | 342 | .to_string(), |
| 348 | ); | ||
| 349 | pb.finish_with_message( | ||
| 350 | console::style( | ||
| 351 | error.to_string().replace("relay pool error:", "error:"), | ||
| 352 | ) | 343 | ) |
| 353 | .for_stderr() | 344 | .with_style(pb_style()?), |
| 354 | .red() | 345 | ); |
| 355 | .to_string(), | 346 | pb.enable_steady_tick(Duration::from_millis(300)); |
| 356 | ); | 347 | Some(pb) |
| 348 | } else { | ||
| 349 | None | ||
| 350 | }; | ||
| 351 | |||
| 352 | #[allow(clippy::large_futures)] | ||
| 353 | match self.fetch_all_from_relay(git_repo_path, request, &pb).await { | ||
| 354 | Err(error) => { | ||
| 355 | if let Some(pb) = pb { | ||
| 356 | pb.set_style(pb_after_style(false)); | ||
| 357 | pb.set_prefix( | ||
| 358 | dim.apply_to(format!("{: <relay_column_width$}", &relay_url)) | ||
| 359 | .to_string(), | ||
| 360 | ); | ||
| 361 | pb.finish_with_message( | ||
| 362 | console::style( | ||
| 363 | error.to_string().replace("relay pool error:", "error:"), | ||
| 364 | ) | ||
| 365 | .for_stderr() | ||
| 366 | .red() | ||
| 367 | .to_string(), | ||
| 368 | ); | ||
| 369 | } | ||
| 370 | Err(error) | ||
| 357 | } | 371 | } |
| 358 | Err(error) | 372 | Ok(res) => Ok(res), |
| 359 | } | 373 | } |
| 360 | Ok(res) => Ok(res), | 374 | }) |
| 361 | } | 375 | .collect(); |
| 362 | }) | ||
| 363 | .collect(); | ||
| 364 | 376 | ||
| 365 | let relay_reports: Vec<Result<FetchReport>> = | 377 | for report in stream::iter(futures) |
| 366 | stream::iter(futures).buffer_unordered(15).collect().await; | 378 | .buffer_unordered(15) |
| 379 | .collect::<Vec<Result<FetchReport>>>() | ||
| 380 | .await | ||
| 381 | { | ||
| 382 | relay_reports.push(report); | ||
| 383 | } | ||
| 384 | |||
| 385 | for relay in &request.relays { | ||
| 386 | processed_relays.insert(relay.clone()); | ||
| 387 | } | ||
| 367 | 388 | ||
| 389 | if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await { | ||
| 390 | request.relays = repo_ref | ||
| 391 | .relays | ||
| 392 | .iter() | ||
| 393 | .filter_map(|r| Url::parse(r).ok()) | ||
| 394 | .filter(|r| !processed_relays.contains(r)) | ||
| 395 | .collect(); | ||
| 396 | if request.relays.is_empty() { | ||
| 397 | break; | ||
| 398 | } | ||
| 399 | } | ||
| 400 | } | ||
| 368 | let report = consolidate_fetch_reports(relay_reports); | 401 | let report = consolidate_fetch_reports(relay_reports); |
| 369 | 402 | ||
| 370 | if report.to_string().is_empty() { | 403 | if report.to_string().is_empty() { |