diff options
| author | DanConwayDev <DanConwayDev@protonmail.com> | 2024-07-16 11:22:17 +0100 |
|---|---|---|
| committer | DanConwayDev <DanConwayDev@protonmail.com> | 2024-07-16 11:22:17 +0100 |
| commit | 7ebe4dfaec1832ff585c264c32d9f5987db69d7d (patch) | |
| tree | 9d0c273455f3ee1a80d0881a1886cffeb868c698 | |
| parent | 142fee58b0449b3fe3f436986339c318de66b33f (diff) | |
refactor(fetch): improve FetchRequest
im preparation for identifying new inbox relays
| -rw-r--r-- | src/client.rs | 143 |
1 files changed, 52 insertions, 91 deletions
diff --git a/src/client.rs b/src/client.rs index 4054e7c..7c81f70 100644 --- a/src/client.rs +++ b/src/client.rs | |||
| @@ -81,9 +81,7 @@ pub trait Connect { | |||
| 81 | async fn fetch_all_from_relay( | 81 | async fn fetch_all_from_relay( |
| 82 | &self, | 82 | &self, |
| 83 | git_repo_path: &Path, | 83 | git_repo_path: &Path, |
| 84 | relay_url: Url, | ||
| 85 | request: FetchRequest, | 84 | request: FetchRequest, |
| 86 | // progress_reporter: &MultiProgress, | ||
| 87 | pb: &Option<ProgressBar>, | 85 | pb: &Option<ProgressBar>, |
| 88 | ) -> Result<FetchReport>; | 86 | ) -> Result<FetchReport>; |
| 89 | } | 87 | } |
| @@ -284,7 +282,6 @@ impl Connect for Client { | |||
| 284 | Ok((relay_results, progress_reporter)) | 282 | Ok((relay_results, progress_reporter)) |
| 285 | } | 283 | } |
| 286 | 284 | ||
| 287 | #[allow(clippy::too_many_lines)] | ||
| 288 | async fn fetch_all( | 285 | async fn fetch_all( |
| 289 | &self, | 286 | &self, |
| 290 | git_repo_path: &Path, | 287 | git_repo_path: &Path, |
| @@ -297,11 +294,12 @@ impl Connect for Client { | |||
| 297 | fallback_relays.insert(url); | 294 | fallback_relays.insert(url); |
| 298 | } | 295 | } |
| 299 | } | 296 | } |
| 300 | let (relays, request) = | 297 | let request = |
| 301 | create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?; | 298 | create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?; |
| 299 | |||
| 302 | let progress_reporter = MultiProgress::new(); | 300 | let progress_reporter = MultiProgress::new(); |
| 303 | 301 | ||
| 304 | for relay in &relays { | 302 | for relay in &request.relays { |
| 305 | self.client | 303 | self.client |
| 306 | .add_relay(relay.as_str()) | 304 | .add_relay(relay.as_str()) |
| 307 | .await | 305 | .await |
| @@ -310,24 +308,27 @@ impl Connect for Client { | |||
| 310 | 308 | ||
| 311 | let dim = Style::new().color256(247); | 309 | let dim = Style::new().color256(247); |
| 312 | 310 | ||
| 313 | let futures: Vec<_> = relays | 311 | let futures: Vec<_> = request |
| 312 | .relays | ||
| 314 | .iter() | 313 | .iter() |
| 315 | // don't look for events on blaster | 314 | // don't look for events on blaster |
| 316 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) | 315 | .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) |
| 317 | .map(|r| (r.clone(), request.clone())) | 316 | .map(|r| FetchRequest { |
| 318 | .map(|(relay, request)| async { | 317 | selected_relay: Some(r.clone()), |
| 318 | ..request.clone() | ||
| 319 | }) | ||
| 320 | .map(|request| async { | ||
| 319 | let relay_column_width = request.relay_column_width; | 321 | let relay_column_width = request.relay_column_width; |
| 320 | 322 | ||
| 323 | let relay_url = request | ||
| 324 | .selected_relay | ||
| 325 | .clone() | ||
| 326 | .context("fetch_all_from_relay called without a relay")?; | ||
| 327 | |||
| 321 | let pb = if std::env::var("NGITTEST").is_err() { | 328 | let pb = if std::env::var("NGITTEST").is_err() { |
| 322 | let pb = progress_reporter.add( | 329 | let pb = progress_reporter.add( |
| 323 | ProgressBar::new(1) | 330 | ProgressBar::new(1) |
| 324 | .with_prefix( | 331 | .with_prefix(format!("{: <relay_column_width$} connecting", &relay_url)) |
| 325 | dim.apply_to(format!( | ||
| 326 | "{: <relay_column_width$}{}", | ||
| 327 | "connecting", &relay | ||
| 328 | )) | ||
| 329 | .to_string(), | ||
| 330 | ) | ||
| 331 | .with_style(pb_style()?), | 332 | .with_style(pb_style()?), |
| 332 | ); | 333 | ); |
| 333 | pb.enable_steady_tick(Duration::from_millis(300)); | 334 | pb.enable_steady_tick(Duration::from_millis(300)); |
| @@ -337,19 +338,13 @@ impl Connect for Client { | |||
| 337 | }; | 338 | }; |
| 338 | 339 | ||
| 339 | #[allow(clippy::large_futures)] | 340 | #[allow(clippy::large_futures)] |
| 340 | match self | 341 | match self.fetch_all_from_relay(git_repo_path, request, &pb).await { |
| 341 | .fetch_all_from_relay(git_repo_path, relay, request, &pb) | ||
| 342 | .await | ||
| 343 | { | ||
| 344 | Err(error) => { | 342 | Err(error) => { |
| 345 | if let Some(pb) = pb { | 343 | if let Some(pb) = pb { |
| 346 | pb.set_style(pb_after_style(false)); | 344 | pb.set_style(pb_after_style(false)); |
| 347 | pb.set_prefix( | 345 | pb.set_prefix( |
| 348 | dim.apply_to(format!( | 346 | dim.apply_to(format!("{: <relay_column_width$}", &relay_url)) |
| 349 | "{: <relay_column_width$}{}", | 347 | .to_string(), |
| 350 | "error", "&relay" | ||
| 351 | )) | ||
| 352 | .to_string(), | ||
| 353 | ); | 348 | ); |
| 354 | pb.finish_with_message( | 349 | pb.finish_with_message( |
| 355 | console::style( | 350 | console::style( |
| @@ -362,29 +357,7 @@ impl Connect for Client { | |||
| 362 | } | 357 | } |
| 363 | Err(error) | 358 | Err(error) |
| 364 | } | 359 | } |
| 365 | Ok(res) => { | 360 | Ok(res) => Ok(res), |
| 366 | if let Some(pb) = pb { | ||
| 367 | pb.set_style(pb_after_style(true)); | ||
| 368 | pb.set_prefix( | ||
| 369 | dim.apply_to(format!( | ||
| 370 | "{: <relay_column_width$}{}", | ||
| 371 | if let Some(relay) = &res.relay { | ||
| 372 | format!("{relay}") | ||
| 373 | } else { | ||
| 374 | String::new() | ||
| 375 | }, | ||
| 376 | if res.to_string().is_empty() { | ||
| 377 | "no updates".to_string() | ||
| 378 | } else { | ||
| 379 | format!("found {res}") | ||
| 380 | }, | ||
| 381 | )) | ||
| 382 | .to_string(), | ||
| 383 | ); | ||
| 384 | pb.finish_with_message(""); | ||
| 385 | } | ||
| 386 | Ok(res) | ||
| 387 | } | ||
| 388 | } | 361 | } |
| 389 | }) | 362 | }) |
| 390 | .collect(); | 363 | .collect(); |
| @@ -395,9 +368,9 @@ impl Connect for Client { | |||
| 395 | let report = consolidate_fetch_reports(relay_reports); | 368 | let report = consolidate_fetch_reports(relay_reports); |
| 396 | 369 | ||
| 397 | if report.to_string().is_empty() { | 370 | if report.to_string().is_empty() { |
| 398 | println!("no updates found"); | 371 | println!("no updates"); |
| 399 | } else { | 372 | } else { |
| 400 | println!("fetched updates: {report}"); | 373 | println!("updates: {report}"); |
| 401 | } | 374 | } |
| 402 | Ok(report) | 375 | Ok(report) |
| 403 | } | 376 | } |
| @@ -405,9 +378,7 @@ impl Connect for Client { | |||
| 405 | async fn fetch_all_from_relay( | 378 | async fn fetch_all_from_relay( |
| 406 | &self, | 379 | &self, |
| 407 | git_repo_path: &Path, | 380 | git_repo_path: &Path, |
| 408 | relay_url: Url, | ||
| 409 | request: FetchRequest, | 381 | request: FetchRequest, |
| 410 | // progress_reporter: &MultiProgress, | ||
| 411 | pb: &Option<ProgressBar>, | 382 | pb: &Option<ProgressBar>, |
| 412 | ) -> Result<FetchReport> { | 383 | ) -> Result<FetchReport> { |
| 413 | let mut fresh_coordinates: HashSet<Coordinate> = HashSet::new(); | 384 | let mut fresh_coordinates: HashSet<Coordinate> = HashSet::new(); |
| @@ -417,27 +388,17 @@ impl Connect for Client { | |||
| 417 | let mut fresh_proposal_roots = request.proposals.clone(); | 388 | let mut fresh_proposal_roots = request.proposals.clone(); |
| 418 | let mut fresh_authors = request.contributor_profiles.clone(); | 389 | let mut fresh_authors = request.contributor_profiles.clone(); |
| 419 | 390 | ||
| 420 | let mut report = FetchReport { | 391 | let mut report = FetchReport::default(); |
| 421 | relay: Some(relay_url.clone()), | ||
| 422 | ..Default::default() | ||
| 423 | }; | ||
| 424 | 392 | ||
| 425 | // let pb = if std::env::var("NGITTEST").is_err() { | 393 | let relay_url = request |
| 426 | // let pb = progress_reporter.add( | 394 | .selected_relay |
| 427 | // ProgressBar::new(1) | 395 | .clone() |
| 428 | // .with_prefix(format!("{: <11}{}", "connecting", relay_url)) | 396 | .context("fetch_all_from_relay called without a relay")?; |
| 429 | // .with_style(pb_style()?), | ||
| 430 | // ); | ||
| 431 | // pb.enable_steady_tick(Duration::from_millis(300)); | ||
| 432 | // Some(pb) | ||
| 433 | // } else { | ||
| 434 | // None | ||
| 435 | // }; | ||
| 436 | |||
| 437 | self.connect(&relay_url).await?; | ||
| 438 | 397 | ||
| 439 | let relay_column_width = request.relay_column_width; | 398 | let relay_column_width = request.relay_column_width; |
| 440 | 399 | ||
| 400 | self.connect(&relay_url).await?; | ||
| 401 | |||
| 441 | let dim = Style::new().color256(247); | 402 | let dim = Style::new().color256(247); |
| 442 | 403 | ||
| 443 | loop { | 404 | loop { |
| @@ -447,12 +408,12 @@ impl Connect for Client { | |||
| 447 | if let Some(pb) = &pb { | 408 | if let Some(pb) = &pb { |
| 448 | pb.set_prefix( | 409 | pb.set_prefix( |
| 449 | dim.apply_to(format!( | 410 | dim.apply_to(format!( |
| 450 | "{: <relay_column_width$}{}", | 411 | "{: <relay_column_width$} {}", |
| 451 | &relay_url, | 412 | &relay_url, |
| 452 | if report.to_string().is_empty() { | 413 | if report.to_string().is_empty() { |
| 453 | "fetching...".to_string() | 414 | "fetching".to_string() |
| 454 | } else { | 415 | } else { |
| 455 | format!("found {report}") | 416 | format!("fetching... found {report}") |
| 456 | }, | 417 | }, |
| 457 | )) | 418 | )) |
| 458 | .to_string(), | 419 | .to_string(), |
| @@ -468,7 +429,6 @@ impl Connect for Client { | |||
| 468 | // TODO: try reconcile | 429 | // TODO: try reconcile |
| 469 | 430 | ||
| 470 | for event in events { | 431 | for event in events { |
| 471 | // TODO existing_events or events in fresh | ||
| 472 | process_fetched_event( | 432 | process_fetched_event( |
| 473 | event, | 433 | event, |
| 474 | &request, | 434 | &request, |
| @@ -485,19 +445,20 @@ impl Connect for Client { | |||
| 485 | } | 445 | } |
| 486 | } | 446 | } |
| 487 | if let Some(pb) = pb { | 447 | if let Some(pb) = pb { |
| 488 | let report_display = format!("{report}"); | 448 | pb.set_style(pb_after_style(true)); |
| 489 | pb.set_prefix( | 449 | pb.set_prefix( |
| 490 | dim.apply_to(format!( | 450 | dim.apply_to(format!( |
| 491 | "{: <relay_column_width$}{}", | 451 | "{: <relay_column_width$} {}", |
| 492 | relay_url, | 452 | relay_url, |
| 493 | if report_display.is_empty() { | 453 | if report.to_string().is_empty() { |
| 494 | String::new() | 454 | "no updates".to_string() |
| 495 | } else { | 455 | } else { |
| 496 | format!("found {report_display}") | 456 | format!("found {report}") |
| 497 | }, | 457 | }, |
| 498 | )) | 458 | )) |
| 499 | .to_string(), | 459 | .to_string(), |
| 500 | ); | 460 | ); |
| 461 | pb.finish_with_message(""); | ||
| 501 | } | 462 | } |
| 502 | Ok(report) | 463 | Ok(report) |
| 503 | } | 464 | } |
| @@ -761,7 +722,7 @@ async fn create_relays_request( | |||
| 761 | git_repo_path: &Path, | 722 | git_repo_path: &Path, |
| 762 | repo_coordinates: &HashSet<Coordinate>, | 723 | repo_coordinates: &HashSet<Coordinate>, |
| 763 | fallback_relays: HashSet<Url>, | 724 | fallback_relays: HashSet<Url>, |
| 764 | ) -> Result<(HashSet<Url>, FetchRequest)> { | 725 | ) -> Result<FetchRequest> { |
| 765 | let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; | 726 | let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; |
| 766 | 727 | ||
| 767 | let relays = { | 728 | let relays = { |
| @@ -834,20 +795,19 @@ async fn create_relays_request( | |||
| 834 | } | 795 | } |
| 835 | existing_events | 796 | existing_events |
| 836 | }; | 797 | }; |
| 837 | Ok(( | 798 | Ok(FetchRequest { |
| 799 | selected_relay: None, | ||
| 838 | relays, | 800 | relays, |
| 839 | FetchRequest { | 801 | relay_column_width, |
| 840 | relay_column_width, | 802 | repo_coordinates: if let Ok(repo_ref) = repo_ref { |
| 841 | repo_coordinates: if let Ok(repo_ref) = repo_ref { | 803 | repo_ref.coordinates_with_timestamps() |
| 842 | repo_ref.coordinates_with_timestamps() | 804 | } else { |
| 843 | } else { | 805 | repo_coordinates.iter().map(|c| (c.clone(), None)).collect() |
| 844 | repo_coordinates.iter().map(|c| (c.clone(), None)).collect() | ||
| 845 | }, | ||
| 846 | proposals, | ||
| 847 | contributor_profiles, | ||
| 848 | existing_events, | ||
| 849 | }, | 806 | }, |
| 850 | )) | 807 | proposals, |
| 808 | contributor_profiles, | ||
| 809 | existing_events, | ||
| 810 | }) | ||
| 851 | } | 811 | } |
| 852 | 812 | ||
| 853 | async fn process_fetched_event( | 813 | async fn process_fetched_event( |
| @@ -1027,7 +987,6 @@ pub fn get_filter_repo_events(repo_coordinates: &HashSet<Coordinate>) -> nostr:: | |||
| 1027 | 987 | ||
| 1028 | #[derive(Default)] | 988 | #[derive(Default)] |
| 1029 | pub struct FetchReport { | 989 | pub struct FetchReport { |
| 1030 | relay: Option<Url>, | ||
| 1031 | repo_coordinates: Vec<Coordinate>, | 990 | repo_coordinates: Vec<Coordinate>, |
| 1032 | updated_repo_announcements: Vec<(Coordinate, Timestamp)>, | 991 | updated_repo_announcements: Vec<(Coordinate, Timestamp)>, |
| 1033 | proposals: HashSet<EventId>, | 992 | proposals: HashSet<EventId>, |
| @@ -1101,6 +1060,8 @@ impl Display for FetchReport { | |||
| 1101 | 1060 | ||
| 1102 | #[derive(Default, Clone)] | 1061 | #[derive(Default, Clone)] |
| 1103 | pub struct FetchRequest { | 1062 | pub struct FetchRequest { |
| 1063 | relays: HashSet<Url>, | ||
| 1064 | selected_relay: Option<Url>, | ||
| 1104 | relay_column_width: usize, | 1065 | relay_column_width: usize, |
| 1105 | repo_coordinates: Vec<(Coordinate, Option<Timestamp>)>, | 1066 | repo_coordinates: Vec<(Coordinate, Option<Timestamp>)>, |
| 1106 | proposals: HashSet<EventId>, | 1067 | proposals: HashSet<EventId>, |