From 949c6459aa7683453a7160423b689ceadb08954b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 4 Sep 2024 08:04:48 +0100 Subject: refactor: organise into lib and bin structure the make the code more readable this commit just moves the files, the next commit should fix the imports --- src/lib/client.rs | 1480 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1480 insertions(+) create mode 100644 src/lib/client.rs (limited to 'src/lib/client.rs') diff --git a/src/lib/client.rs b/src/lib/client.rs new file mode 100644 index 0000000..abde217 --- /dev/null +++ b/src/lib/client.rs @@ -0,0 +1,1480 @@ +// have you considered + +// TO USE ASYNC + +// in traits (required for mocking unit tests) +// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html +// https://github.com/dtolnay/async-trait +// see https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html +// I think we can use the async-trait crate and switch to the native feature +// which is currently in nightly. alternatively we can use nightly as it looks +// certain that the implementation is going to make it to stable but we don't +// want to inadvertlty use other features of nightly that might be removed. +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Write}, + fs::create_dir_all, + path::Path, + time::Duration, +}; + +use anyhow::{bail, Context, Result}; +use async_trait::async_trait; +use console::Style; +use futures::stream::{self, StreamExt}; +use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +#[cfg(test)] +use mockall::*; +use nostr::{nips::nip01::Coordinate, Event}; +use nostr_database::{NostrDatabase, Order}; +use nostr_sdk::{ + prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey, + SingleLetterTag, Timestamp, Url, +}; +use nostr_sqlite::SQLiteDatabase; + +use crate::{ + config::get_dirs, + login::{get_logged_in_user, get_user_ref_from_cache}, + repo_ref::RepoRef, + repo_state::RepoState, + sub_commands::{ + list::status_kinds, + send::{event_is_patch_set_root, event_is_revision_root}, + }, +}; + +#[allow(clippy::struct_field_names)] +pub struct Client { + client: nostr_sdk::Client, + fallback_relays: Vec, + more_fallback_relays: Vec, + blaster_relays: Vec, +} + +#[cfg_attr(test, automock)] +#[async_trait] +pub trait Connect { + fn default() -> Self; + fn new(opts: Params) -> Self; + async fn set_signer(&mut self, signer: NostrSigner); + async fn connect(&self, relay_url: &Url) -> Result<()>; + async fn disconnect(&self) -> Result<()>; + fn get_fallback_relays(&self) -> &Vec; + fn get_more_fallback_relays(&self) -> &Vec; + fn get_blaster_relays(&self) -> &Vec; + async fn send_event_to( + &self, + git_repo_path: &Path, + url: &str, + event: nostr::event::Event, + ) -> Result; + async fn get_events( + &self, + relays: Vec, + filters: Vec, + ) -> Result>; + async fn get_events_per_relay( + &self, + relays: Vec, + filters: Vec, + progress_reporter: MultiProgress, + ) -> Result<(Vec>>, MultiProgress)>; + async fn fetch_all( + &self, + git_repo_path: &Path, + repo_coordinates: &HashSet, + user_profiles: &HashSet, + ) -> Result<(Vec>, MultiProgress)>; + async fn fetch_all_from_relay( + &self, + git_repo_path: &Path, + request: FetchRequest, + pb: &Option, + ) -> Result; +} + +#[async_trait] +impl Connect for Client { + fn default() -> Self { + let fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { + vec![ + "ws://localhost:8051".to_string(), + "ws://localhost:8052".to_string(), + ] + } else { + vec![ + "wss://relay.damus.io".to_string(), /* free, good reliability, have been known + * to delete all messages */ + "wss://nos.lol".to_string(), + "wss://relay.nostr.band".to_string(), + ] + }; + + let more_fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { + vec![ + "ws://localhost:8055".to_string(), + "ws://localhost:8056".to_string(), + ] + } else { + vec![ + "wss://purplerelay.com".to_string(), // free but reliability not tested + "wss://purplepages.es".to_string(), // for profile events but unreliable + "wss://relayable.org".to_string(), // free but not always reliable + ] + }; + + let blaster_relays: Vec = if std::env::var("NGITTEST").is_ok() { + vec!["ws://localhost:8057".to_string()] + } else { + vec!["wss://nostr.mutinywallet.com".to_string()] + }; + Client { + client: nostr_sdk::ClientBuilder::new() + .opts(Options::new().relay_limits(RelayLimits::disable())) + .build(), + fallback_relays, + more_fallback_relays, + blaster_relays, + } + } + fn new(opts: Params) -> Self { + Client { + client: nostr_sdk::ClientBuilder::new() + .opts(Options::new().relay_limits(RelayLimits::disable())) + .signer(&opts.keys.unwrap_or(nostr::Keys::generate())) + // .database( + // SQLiteDatabase::open(get_dirs()?.cache_dir().join("nostr-cache.sqlite")). + // await?, ) + .build(), + fallback_relays: opts.fallback_relays, + more_fallback_relays: opts.more_fallback_relays, + blaster_relays: opts.blaster_relays, + } + } + + async fn set_signer(&mut self, signer: NostrSigner) { + self.client.set_signer(Some(signer)).await; + } + + async fn connect(&self, relay_url: &Url) -> Result<()> { + self.client + .add_relay(relay_url) + .await + .context("cannot add relay")?; + + let relay = self.client.relay(relay_url).await?; + + if !relay.is_connected().await { + #[allow(clippy::large_futures)] + relay + .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) + .await; + } + + if !relay.is_connected().await { + bail!("connection timeout"); + } + Ok(()) + } + + async fn disconnect(&self) -> Result<()> { + self.client.disconnect().await?; + Ok(()) + } + + fn get_fallback_relays(&self) -> &Vec { + &self.fallback_relays + } + + fn get_more_fallback_relays(&self) -> &Vec { + &self.more_fallback_relays + } + + fn get_blaster_relays(&self) -> &Vec { + &self.blaster_relays + } + + async fn send_event_to( + &self, + git_repo_path: &Path, + url: &str, + event: Event, + ) -> Result { + self.client.add_relay(url).await?; + #[allow(clippy::large_futures)] + self.client.connect_relay(url).await?; + let res = self.client.send_event_to(vec![url], event.clone()).await?; + if let Some(err) = res.failed.get(&Url::parse(url)?) { + bail!(if let Some(err) = err { + err.to_string() + } else { + "error: unknown".to_string() + }); + } + save_event_in_cache(git_repo_path, &event).await?; + if event.kind().eq(&Kind::GitRepoAnnouncement) { + save_event_in_global_cache(git_repo_path, &event).await?; + } + Ok(event.id()) + } + + async fn get_events( + &self, + relays: Vec, + filters: Vec, + ) -> Result> { + let (relay_results, _) = self + .get_events_per_relay( + relays.iter().map(|r| Url::parse(r).unwrap()).collect(), + filters, + MultiProgress::new(), + ) + .await?; + Ok(get_dedup_events(relay_results)) + } + + async fn get_events_per_relay( + &self, + relays: Vec, + filters: Vec, + progress_reporter: MultiProgress, + ) -> Result<(Vec>>, MultiProgress)> { + // add relays + for relay in &relays { + self.client + .add_relay(relay.as_str()) + .await + .context("cannot add relay")?; + } + + let relays_map = self.client.relays().await; + + let futures: Vec<_> = relays + .clone() + .iter() + // don't look for events on blaster + .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) + .map(|r| (relays_map.get(r).unwrap(), filters.clone())) + .map(|(relay, filters)| async { + let pb = if std::env::var("NGITTEST").is_err() { + let pb = progress_reporter.add( + ProgressBar::new(1) + .with_prefix(format!("{: <11}{}", "connecting", relay.url())) + .with_style(pb_style()?), + ); + pb.enable_steady_tick(Duration::from_millis(300)); + Some(pb) + } else { + None + }; + #[allow(clippy::large_futures)] + match get_events_of(relay, filters, &pb).await { + Err(error) => { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix(format!("{: <11}{}", "error", relay.url())); + pb.finish_with_message( + console::style( + error.to_string().replace("relay pool error:", "error:"), + ) + .for_stderr() + .red() + .to_string(), + ); + } + Err(error) + } + Ok(res) => { + if let Some(pb) = pb { + pb.set_style(pb_after_style(true)); + pb.set_prefix(format!( + "{: <11}{}", + format!("{} events", res.len()), + relay.url() + )); + pb.finish_with_message(""); + } + Ok(res) + } + } + }) + .collect(); + + let relay_results: Vec>> = + stream::iter(futures).buffer_unordered(15).collect().await; + + Ok((relay_results, progress_reporter)) + } + + #[allow(clippy::too_many_lines)] + async fn fetch_all( + &self, + git_repo_path: &Path, + repo_coordinates: &HashSet, + user_profiles: &HashSet, + ) -> Result<(Vec>, MultiProgress)> { + let fallback_relays = &self + .fallback_relays + .iter() + .filter_map(|r| Url::parse(r).ok()) + .collect::>(); + + let mut request = create_relays_request( + git_repo_path, + repo_coordinates, + user_profiles, + fallback_relays.clone(), + ) + .await?; + + let progress_reporter = MultiProgress::new(); + + let mut processed_relays = HashSet::new(); + + let mut relay_reports: Vec> = vec![]; + + loop { + let relays = request + .repo_relays + .union(&request.user_relays_for_profiles) + // don't look for events on blaster + .filter(|&r| !r.as_str().contains("nostr.mutinywallet.com")) + .cloned() + .collect::>() + .difference(&processed_relays) + .cloned() + .collect::>(); + if relays.is_empty() { + break; + } + let profile_relays_only = request + .user_relays_for_profiles + .difference(&request.repo_relays) + .collect::>(); + for relay in &request.repo_relays { + self.client + .add_relay(relay.as_str()) + .await + .context("cannot add relay")?; + } + + let dim = Style::new().color256(247); + + let futures: Vec<_> = relays + .iter() + .map(|r| { + if profile_relays_only.contains(r) { + // if relay isn't a repo relay, just filter for user profile + FetchRequest { + selected_relay: Some(r.to_owned()), + repo_coordinates_without_relays: vec![], + proposals: HashSet::new(), + missing_contributor_profiles: request + .missing_contributor_profiles + .union( + &request + .profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect(), + ) + .copied() + .collect(), + ..request.clone() + } + } else { + FetchRequest { + selected_relay: Some(r.to_owned()), + ..request.clone() + } + } + }) + .map(|request| async { + let relay_column_width = request.relay_column_width; + + let relay_url = request + .selected_relay + .clone() + .context("fetch_all_from_relay called without a relay")?; + + let pb = if std::env::var("NGITTEST").is_err() { + let pb = progress_reporter.add( + ProgressBar::new(1) + .with_prefix( + dim.apply_to(format!( + "{: { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix( + dim.apply_to(format!("{: Ok(res), + } + }) + .collect(); + + for report in stream::iter(futures) + .buffer_unordered(15) + .collect::>>() + .await + { + relay_reports.push(report); + } + processed_relays.extend(relays.clone()); + + if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await { + request.repo_relays = repo_ref + .relays + .iter() + .filter_map(|r| Url::parse(r).ok()) + .collect(); + } + + request.user_relays_for_profiles = { + let mut set = HashSet::new(); + for user in &request + .profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect::>() + { + if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { + for r in user_ref.relays.write() { + if let Ok(url) = Url::parse(&r) { + set.insert(url); + } + } + } + } + set + }; + } + Ok((relay_reports, progress_reporter)) + } + + async fn fetch_all_from_relay( + &self, + git_repo_path: &Path, + request: FetchRequest, + pb: &Option, + ) -> Result { + let mut fresh_coordinates: HashSet = HashSet::new(); + for (c, _) in request.repo_coordinates_without_relays.clone() { + fresh_coordinates.insert(c); + } + let mut fresh_proposal_roots = request.proposals.clone(); + let mut fresh_profiles: HashSet = request + .missing_contributor_profiles + .union( + &request + .profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect(), + ) + .copied() + .collect(); + + let mut report = FetchReport::default(); + + let relay_url = request + .selected_relay + .clone() + .context("fetch_all_from_relay called without a relay")?; + + let relay_column_width = request.relay_column_width; + + self.connect(&relay_url).await?; + + let dim = Style::new().color256(247); + + loop { + let filters = + get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_profiles); + + if let Some(pb) = &pb { + pb.set_prefix( + dim.apply_to(format!( + "{: = get_events_of(&relay, filters.clone(), &None) + .await? + .iter() + // don't process events that don't match filters + .filter(|e| filters.iter().any(|f| f.match_event(e))) + .cloned() + .collect(); + // TODO: try reconcile + + process_fetched_events( + events, + &request, + git_repo_path, + &mut fresh_coordinates, + &mut fresh_proposal_roots, + &mut fresh_profiles, + &mut report, + ) + .await?; + + if fresh_coordinates.is_empty() + && fresh_proposal_roots.is_empty() + && fresh_profiles.is_empty() + { + break; + } + } + if let Some(pb) = pb { + pb.set_style(pb_after_style(true)); + pb.set_prefix( + dim.apply_to(format!( + "{: , + pb: &Option, +) -> Result> { + // relay.reconcile(filter, opts).await?; + + if !relay.is_connected().await { + #[allow(clippy::large_futures)] + relay + .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) + .await; + } + + if !relay.is_connected().await { + bail!("connection timeout"); + } else if let Some(pb) = pb { + pb.set_prefix(format!("connected {}", relay.url())); + } + let events = relay + .get_events_of( + filters, + // 20 is nostr_sdk default + std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), + nostr_sdk::FilterOptions::ExitOnEOSE, + ) + .await?; + Ok(events) +} + +#[derive(Default)] +pub struct Params { + pub keys: Option, + pub fallback_relays: Vec, + pub more_fallback_relays: Vec, + pub blaster_relays: Vec, +} + +fn get_dedup_events(relay_results: Vec>>) -> Vec { + let mut dedup_events: Vec = vec![]; + for events in relay_results.into_iter().flatten() { + for event in events { + if !dedup_events.iter().any(|e| event.id.eq(&e.id)) { + dedup_events.push(event); + } + } + } + dedup_events +} + +pub async fn sign_event(event_builder: EventBuilder, signer: &NostrSigner) -> Result { + if signer.r#type().eq(&nostr_signer::NostrSignerType::NIP46) { + let term = console::Term::stderr(); + term.write_line("signing event with remote signer...")?; + let event = signer + .sign_event_builder(event_builder) + .await + .context("failed to sign event")?; + term.clear_last_lines(1)?; + Ok(event) + } else { + signer + .sign_event_builder(event_builder) + .await + .context("failed to sign event") + } +} + +pub async fn fetch_public_key(signer: &NostrSigner) -> Result { + let term = console::Term::stderr(); + term.write_line("fetching npub from remote signer...")?; + let public_key = signer + .public_key() + .await + .context("failed to get npub from remote signer")?; + term.clear_last_lines(1)?; + Ok(public_key) +} + +fn pb_style() -> Result { + Ok( + ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( + "timeout_in", + |state: &ProgressState, w: &mut dyn Write| { + if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { + let dim = Style::new().color256(247); + write!( + w, + "{}", + dim.apply_to(format!( + "timeout in {:.1}s", + GET_EVENTS_TIMEOUT - state.elapsed().as_secs() + )) + ) + .unwrap(); + } + }, + ), + ) +} + +fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle { + ProgressStyle::with_template( + format!( + " {} {}", + if succeed { + console::style("✔".to_string()) + .for_stderr() + .green() + .to_string() + } else { + console::style("✘".to_string()) + .for_stderr() + .red() + .to_string() + }, + "{prefix} {msg}", + ) + .as_str(), + ) + .unwrap() +} + +async fn get_local_cache_database(git_repo_path: &Path) -> Result { + SQLiteDatabase::open(git_repo_path.join(".git/nostr-cache.sqlite")) + .await + .context("cannot open or create nostr cache database at .git/nostr-cache.sqlite") +} + +async fn get_global_cache_database(git_repo_path: &Path) -> Result { + SQLiteDatabase::open(if std::env::var("NGITTEST").is_err() { + create_dir_all(get_dirs()?.cache_dir()).context(format!( + "cannot create cache directory in: {:?}", + get_dirs()?.cache_dir() + ))?; + get_dirs()?.cache_dir().join("nostr-cache.sqlite") + } else { + git_repo_path.join(".git/test-global-cache.sqlite") + }) + .await + .context("cannot open ngit global nostr cache database") +} + +pub async fn get_events_from_cache( + git_repo_path: &Path, + filters: Vec, +) -> Result> { + get_local_cache_database(git_repo_path) + .await? + .query(filters.clone(), Order::Asc) + .await + .context( + "cannot execute query on opened git repo nostr cache database .git/nostr-cache.sqlite", + ) +} + +pub async fn get_event_from_global_cache( + git_repo_path: &Path, + filters: Vec, +) -> Result> { + get_global_cache_database(git_repo_path) + .await? + .query(filters.clone(), Order::Asc) + .await + .context("cannot execute query on opened ngit nostr cache database") +} + +pub async fn save_event_in_cache(git_repo_path: &Path, event: &nostr::Event) -> Result { + get_local_cache_database(git_repo_path) + .await? + .save_event(event) + .await + .context("cannot save event in local cache") +} + +pub async fn save_event_in_global_cache( + git_repo_path: &Path, + event: &nostr::Event, +) -> Result { + get_global_cache_database(git_repo_path) + .await? + .save_event(event) + .await + .context("cannot save event in local cache") +} + +pub async fn get_repo_ref_from_cache( + git_repo_path: &Path, + repo_coordinates: &HashSet, +) -> Result { + let mut maintainers = HashSet::new(); + let mut new_coordinate: bool; + + for c in repo_coordinates { + maintainers.insert(c.public_key); + } + let mut repo_events = vec![]; + loop { + new_coordinate = false; + let repo_events_filter = get_filter_repo_events(repo_coordinates); + + let events = [ + get_event_from_global_cache(git_repo_path, vec![repo_events_filter.clone()]).await?, + get_events_from_cache(git_repo_path, vec![repo_events_filter]).await?, + ] + .concat(); + for e in events { + if let Ok(repo_ref) = RepoRef::try_from(e.clone()) { + for m in repo_ref.maintainers { + if maintainers.insert(m) { + new_coordinate = true; + } + } + repo_events.push(e); + } + } + if !new_coordinate { + break; + } + } + repo_events.sort_by_key(|e| e.created_at); + let repo_ref = RepoRef::try_from( + repo_events + .first() + .context("no repo events at specified coordinates")? + .clone(), + )?; + + let mut events: HashMap = HashMap::new(); + for m in &maintainers { + if let Some(e) = repo_events.iter().find(|e| e.author().eq(m)) { + events.insert( + Coordinate { + kind: e.kind, + identifier: e.identifier().unwrap().to_string(), + public_key: e.author(), + relays: vec![], + }, + e.clone(), + ); + } + } + + Ok(RepoRef { + // use all maintainers from all events found, not just maintainers in the most + // recent event + maintainers: maintainers.iter().copied().collect::>(), + events, + ..repo_ref + }) +} + +pub async fn get_state_from_cache(git_repo_path: &Path, repo_ref: &RepoRef) -> Result { + RepoState::try_from( + get_events_from_cache( + git_repo_path, + vec![get_filter_state_events(&repo_ref.coordinates())], + ) + .await?, + ) +} + +#[allow(clippy::too_many_lines)] +async fn create_relays_request( + git_repo_path: &Path, + repo_coordinates: &HashSet, + user_profiles: &HashSet, + fallback_relays: HashSet, +) -> Result { + let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; + + let repo_coordinates = { + // add coordinates of users listed in maintainers to explicitly specified + // coodinates + let mut repo_coordinates = repo_coordinates.clone(); + if let Ok(repo_ref) = &repo_ref { + for c in repo_ref.coordinates() { + if !repo_coordinates + .iter() + .any(|e| e.identifier.eq(&c.identifier) && e.public_key.eq(&c.public_key)) + { + repo_coordinates.insert(c); + } + } + } + repo_coordinates + }; + + let repo_coordinates_without_relays = { + let mut set = HashSet::new(); + for c in &repo_coordinates { + set.insert(Coordinate { + kind: c.kind, + identifier: c.identifier.clone(), + public_key: c.public_key, + relays: vec![], + }); + } + set + }; + + let mut proposals: HashSet = HashSet::new(); + let mut missing_contributor_profiles: HashSet = HashSet::new(); + let mut contributors: HashSet = HashSet::new(); + + if !repo_coordinates_without_relays.is_empty() { + if let Ok(repo_ref) = &repo_ref { + for m in &repo_ref.maintainers { + contributors.insert(m.to_owned()); + } + } + + for event in &get_events_from_cache( + git_repo_path, + vec![ + nostr::Filter::default() + .kinds(vec![Kind::GitPatch]) + .custom_tag( + SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), + repo_coordinates_without_relays + .iter() + .map(std::string::ToString::to_string) + .collect::>(), + ), + ], + ) + .await? + { + if event_is_patch_set_root(event) || event_is_revision_root(event) { + proposals.insert(event.id()); + contributors.insert(event.author()); + } + } + + let profile_events = get_event_from_global_cache( + git_repo_path, + vec![get_filter_contributor_profiles(contributors.clone())], + ) + .await?; + for c in &contributors { + if let Some(event) = profile_events + .iter() + .find(|e| e.kind() == Kind::Metadata && e.author().eq(c)) + { + save_event_in_cache(git_repo_path, event).await?; + } else { + missing_contributor_profiles.insert(c.to_owned()); + } + } + } + + let profiles_to_fetch_from_user_relays = { + let mut user_profiles = user_profiles.clone(); + if let Ok(Some(current_user)) = get_logged_in_user(git_repo_path).await { + user_profiles.insert(current_user); + } + let mut map: HashMap = HashMap::new(); + for public_key in &user_profiles { + if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, public_key).await { + map.insert( + public_key.to_owned(), + (user_ref.metadata.created_at, user_ref.relays.created_at), + ); + } else { + map.insert( + public_key.to_owned(), + (Timestamp::from(0), Timestamp::from(0)), + ); + } + } + map + }; + + let user_relays_for_profiles = { + let mut set = HashSet::new(); + for user in &profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect::>() + { + if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { + for r in user_ref.relays.write() { + if let Ok(url) = Url::parse(&r) { + set.insert(url); + } + } + } else { + missing_contributor_profiles.insert(user.to_owned()); + } + } + set + }; + + let existing_events: HashSet = { + let mut existing_events: HashSet = HashSet::new(); + for filter in get_fetch_filters( + &repo_coordinates_without_relays, + &proposals, + &missing_contributor_profiles + .union( + &profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect(), + ) + .copied() + .collect(), + ) { + for (id, _) in get_local_cache_database(git_repo_path) + .await? + .negentropy_items(filter) + .await? + { + existing_events.insert(id); + } + } + existing_events + }; + + let relays = { + let mut relays = fallback_relays; + if let Ok(repo_ref) = &repo_ref { + for r in &repo_ref.relays { + if let Ok(url) = Url::parse(r) { + relays.insert(url); + } + } + } + for c in repo_coordinates { + for r in &c.relays { + if let Ok(url) = Url::parse(r) { + relays.insert(url); + } + } + } + relays + }; + + let relay_column_width = relays + .union(&user_relays_for_profiles) + .reduce(|a, r| { + if r.to_string() + .chars() + .count() + .gt(&a.to_string().chars().count()) + { + r + } else { + a + } + }) + .unwrap() + .to_string() + .chars() + .count() + + 2; + + Ok(FetchRequest { + selected_relay: None, + repo_relays: relays, + relay_column_width, + repo_coordinates_without_relays: if let Ok(repo_ref) = &repo_ref { + repo_ref.coordinates_with_timestamps() + } else { + repo_coordinates_without_relays + .iter() + .map(|c| (c.clone(), None)) + .collect() + }, + state: if let Ok(repo_ref) = &repo_ref { + if let Ok(existing_state) = get_state_from_cache(git_repo_path, repo_ref).await { + Some((existing_state.event.created_at, existing_state.event.id)) + } else { + None + } + } else { + None + }, + proposals, + contributors, + missing_contributor_profiles, + existing_events, + profiles_to_fetch_from_user_relays, + user_relays_for_profiles, + }) +} + +#[allow(clippy::too_many_lines)] +async fn process_fetched_events( + events: Vec, + request: &FetchRequest, + git_repo_path: &Path, + fresh_coordinates: &mut HashSet, + fresh_proposal_roots: &mut HashSet, + fresh_profiles: &mut HashSet, + report: &mut FetchReport, +) -> Result<()> { + for event in &events { + if !request.existing_events.contains(&event.id) { + save_event_in_cache(git_repo_path, event).await?; + if event.kind().eq(&Kind::GitRepoAnnouncement) { + save_event_in_global_cache(git_repo_path, event).await?; + let new_coordinate = !request + .repo_coordinates_without_relays + .iter() + .map(|(c, _)| c.clone()) + .any(|c| { + c.identifier.eq(event.identifier().unwrap()) + && c.public_key.eq(&event.pubkey) + }); + let update_to_existing = !new_coordinate + && request + .repo_coordinates_without_relays + .iter() + .any(|(c, t)| { + c.identifier.eq(event.identifier().unwrap()) + && c.public_key.eq(&event.pubkey) + && if let Some(t) = t { + event.created_at.gt(t) + } else { + true + } + }); + if update_to_existing { + report.updated_repo_announcements.push(( + Coordinate { + kind: event.kind(), + public_key: event.author(), + identifier: event.identifier().unwrap().to_owned(), + relays: vec![], + }, + event.created_at, + )); + } + // if contains new maintainer + if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) { + for m in &repo_ref.maintainers { + if !request + .repo_coordinates_without_relays // prexisting maintainers + .iter() + .map(|(c, _)| c.clone()) + .collect::>() + .union(&report.repo_coordinates_without_relays) // already added maintainers + .any(|c| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key)) + { + let c = Coordinate { + kind: event.kind(), + public_key: *m, + identifier: repo_ref.identifier.clone(), + relays: vec![], + }; + fresh_coordinates.insert(c.clone()); + report.repo_coordinates_without_relays.insert(c); + + if !request.contributors.contains(m) + && !request + .profiles_to_fetch_from_user_relays + .clone() + .into_keys() + .collect::>() + .contains(m) + && !fresh_profiles.contains(m) + { + fresh_profiles.insert(m.to_owned()); + } + } + } + } + } else if event.kind().eq(&STATE_KIND) { + let existing_state = if report.updated_state.is_some() { + report.updated_state + } else { + request.state + }; + if let Some((timestamp, id)) = existing_state { + if event.created_at.gt(×tamp) + || (event.created_at.eq(×tamp) && event.id.gt(&id)) + { + report.updated_state = Some((event.created_at, event.id)); + } + } + } else if event_is_patch_set_root(event) { + fresh_proposal_roots.insert(event.id); + report.proposals.insert(event.id); + if !request.contributors.contains(&event.author()) + && !fresh_profiles.contains(&event.author()) + { + fresh_profiles.insert(event.author()); + } + } else if [Kind::RelayList, Kind::Metadata].contains(&event.kind()) { + if request + .missing_contributor_profiles + .contains(&event.author()) + { + report.contributor_profiles.insert(event.author()); + } else if let Some((_, (metadata_timestamp, relay_list_timestamp))) = request + .profiles_to_fetch_from_user_relays + .get_key_value(&event.author()) + { + if (Kind::Metadata.eq(&event.kind()) + && event.created_at().gt(metadata_timestamp)) + || (Kind::RelayList.eq(&event.kind()) + && event.created_at().gt(relay_list_timestamp)) + { + report.profile_updates.insert(event.author()); + } + } + save_event_in_global_cache(git_repo_path, event).await?; + } + } + } + for event in &events { + if !request.existing_events.contains(&event.id) + && !event.event_ids().any(|id| report.proposals.contains(id)) + { + if event.kind().eq(&Kind::GitPatch) && !event_is_patch_set_root(event) { + report.commits.insert(event.id); + } else if status_kinds().contains(&event.kind()) { + report.statuses.insert(event.id); + } + } + } + Ok(()) +} + +pub fn consolidate_fetch_reports(reports: Vec>) -> FetchReport { + let mut report = FetchReport::default(); + for relay_report in reports.into_iter().flatten() { + for c in relay_report.repo_coordinates_without_relays { + if !report + .repo_coordinates_without_relays + .iter() + .any(|e| e.eq(&c)) + { + report.repo_coordinates_without_relays.insert(c); + } + } + for (r, t) in relay_report.updated_repo_announcements { + if let Some(i) = report + .updated_repo_announcements + .iter() + .position(|(e, _)| e.eq(&r)) + { + let (_, existing_t) = &report.updated_repo_announcements[i]; + if t.gt(existing_t) { + report.updated_repo_announcements[i] = (r, t); + } + } else { + report.updated_repo_announcements.push((r, t)); + } + } + if let Some((timestamp, id)) = relay_report.updated_state { + if let Some((existing_timestamp, existing_id)) = report.updated_state { + if timestamp.gt(&existing_timestamp) + || (timestamp.eq(&existing_timestamp) && id.gt(&existing_id)) + { + report.updated_state = Some((timestamp, id)); + } + } else { + report.updated_state = Some((timestamp, id)); + } + } + for c in relay_report.proposals { + report.proposals.insert(c); + } + for c in relay_report.commits { + report.commits.insert(c); + } + for c in relay_report.statuses { + report.statuses.insert(c); + } + for c in relay_report.contributor_profiles { + report.contributor_profiles.insert(c); + } + for c in relay_report.profile_updates { + report.profile_updates.insert(c); + } + } + report +} +pub fn get_fetch_filters( + repo_coordinates: &HashSet, + proposal_ids: &HashSet, + required_profiles: &HashSet, +) -> Vec { + [ + if repo_coordinates.is_empty() { + vec![] + } else { + vec![ + get_filter_state_events(repo_coordinates), + get_filter_repo_events(repo_coordinates), + nostr::Filter::default() + .kinds(vec![Kind::GitPatch, Kind::EventDeletion]) + .custom_tag( + SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), + repo_coordinates + .iter() + .map(std::string::ToString::to_string) + .collect::>(), + ), + ] + }, + if proposal_ids.is_empty() { + vec![] + } else { + vec![ + nostr::Filter::default() + .events(proposal_ids.clone()) + .kinds([vec![Kind::GitPatch, Kind::EventDeletion], status_kinds()].concat()), + ] + }, + if required_profiles.is_empty() { + vec![] + } else { + vec![get_filter_contributor_profiles(required_profiles.clone())] + }, + ] + .concat() +} + +pub fn get_filter_repo_events(repo_coordinates: &HashSet) -> nostr::Filter { + nostr::Filter::default() + .kind(Kind::GitRepoAnnouncement) + .identifiers( + repo_coordinates + .iter() + .map(|c| c.identifier.clone()) + .collect::>(), + ) + .authors( + repo_coordinates + .iter() + .map(|c| c.public_key) + .collect::>(), + ) +} + +pub static STATE_KIND: nostr::Kind = Kind::Custom(30618); +pub fn get_filter_state_events(repo_coordinates: &HashSet) -> nostr::Filter { + nostr::Filter::default() + .kind(STATE_KIND) + .identifiers( + repo_coordinates + .iter() + .map(|c| c.identifier.clone()) + .collect::>(), + ) + .authors( + repo_coordinates + .iter() + .map(|c| c.public_key) + .collect::>(), + ) +} + +pub fn get_filter_contributor_profiles(contributors: HashSet) -> nostr::Filter { + nostr::Filter::default() + .kinds(vec![Kind::Metadata, Kind::RelayList]) + .authors(contributors) +} + +#[derive(Default)] +pub struct FetchReport { + repo_coordinates_without_relays: HashSet, + updated_repo_announcements: Vec<(Coordinate, Timestamp)>, + updated_state: Option<(Timestamp, EventId)>, + proposals: HashSet, + /// commits against existing propoals + commits: HashSet, + statuses: HashSet, + contributor_profiles: HashSet, + profile_updates: HashSet, +} + +impl Display for FetchReport { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses" + let mut display_items: Vec = vec![]; + if !self.repo_coordinates_without_relays.is_empty() { + display_items.push(format!( + "{} new maintainer{}", + self.repo_coordinates_without_relays.len(), + if self.repo_coordinates_without_relays.len() > 1 { + "s" + } else { + "" + }, + )); + } + if !self.updated_repo_announcements.is_empty() { + display_items.push(format!( + "{} announcement update{}", + self.updated_repo_announcements.len(), + if self.updated_repo_announcements.len() > 1 { + "s" + } else { + "" + }, + )); + } + if self.updated_state.is_some() { + display_items.push("new state".to_string()); + } + if !self.proposals.is_empty() { + display_items.push(format!( + "{} proposal{}", + self.proposals.len(), + if self.proposals.len() > 1 { "s" } else { "" }, + )); + } + if !self.commits.is_empty() { + display_items.push(format!( + "{} commit{}", + self.commits.len(), + if self.commits.len() > 1 { "s" } else { "" }, + )); + } + if !self.statuses.is_empty() { + display_items.push(format!( + "{} status{}", + self.statuses.len(), + if self.statuses.len() > 1 { "es" } else { "" }, + )); + } + if !self.contributor_profiles.is_empty() { + display_items.push(format!( + "{} user profile{}", + self.contributor_profiles.len(), + if self.contributor_profiles.len() > 1 { + "s" + } else { + "" + }, + )); + } + if !self.profile_updates.is_empty() { + display_items.push(format!( + "{} profile update{}", + self.profile_updates.len(), + if self.profile_updates.len() > 1 { + "s" + } else { + "" + }, + )); + } + write!(f, "{}", display_items.join(", ")) + } +} + +#[derive(Default, Clone)] +pub struct FetchRequest { + repo_relays: HashSet, + selected_relay: Option, + relay_column_width: usize, + repo_coordinates_without_relays: Vec<(Coordinate, Option)>, + state: Option<(Timestamp, EventId)>, + proposals: HashSet, + contributors: HashSet, + missing_contributor_profiles: HashSet, + existing_events: HashSet, + profiles_to_fetch_from_user_relays: HashMap, + user_relays_for_profiles: HashSet, +} + +pub async fn fetching_with_report( + git_repo_path: &Path, + #[cfg(test)] client: &crate::client::MockConnect, + #[cfg(not(test))] client: &Client, + repo_coordinates: &HashSet, +) -> Result { + let term = console::Term::stderr(); + term.write_line("fetching updates...")?; + let (relay_reports, progress_reporter) = client + .fetch_all(git_repo_path, repo_coordinates, &HashSet::new()) + .await?; + if !relay_reports.iter().any(std::result::Result::is_err) { + let _ = progress_reporter.clear(); + } + let report = consolidate_fetch_reports(relay_reports); + if report.to_string().is_empty() { + println!("no updates"); + } else { + println!("updates: {report}"); + } + Ok(report) +} -- cgit v1.2.3