From 949c6459aa7683453a7160423b689ceadb08954b Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Wed, 4 Sep 2024 08:04:48 +0100 Subject: refactor: organise into lib and bin structure the make the code more readable this commit just moves the files, the next commit should fix the imports --- src/client.rs | 1480 --------------------------------------------------------- 1 file changed, 1480 deletions(-) delete mode 100644 src/client.rs (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs deleted file mode 100644 index abde217..0000000 --- a/src/client.rs +++ /dev/null @@ -1,1480 +0,0 @@ -// have you considered - -// TO USE ASYNC - -// in traits (required for mocking unit tests) -// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html -// https://github.com/dtolnay/async-trait -// see https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html -// I think we can use the async-trait crate and switch to the native feature -// which is currently in nightly. alternatively we can use nightly as it looks -// certain that the implementation is going to make it to stable but we don't -// want to inadvertlty use other features of nightly that might be removed. -use std::{ - collections::{HashMap, HashSet}, - fmt::{Display, Write}, - fs::create_dir_all, - path::Path, - time::Duration, -}; - -use anyhow::{bail, Context, Result}; -use async_trait::async_trait; -use console::Style; -use futures::stream::{self, StreamExt}; -use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -#[cfg(test)] -use mockall::*; -use nostr::{nips::nip01::Coordinate, Event}; -use nostr_database::{NostrDatabase, Order}; -use nostr_sdk::{ - prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey, - SingleLetterTag, Timestamp, Url, -}; -use nostr_sqlite::SQLiteDatabase; - -use crate::{ - config::get_dirs, - login::{get_logged_in_user, get_user_ref_from_cache}, - repo_ref::RepoRef, - repo_state::RepoState, - sub_commands::{ - list::status_kinds, - send::{event_is_patch_set_root, event_is_revision_root}, - }, -}; - -#[allow(clippy::struct_field_names)] -pub struct Client { - client: nostr_sdk::Client, - fallback_relays: Vec, - more_fallback_relays: Vec, - blaster_relays: Vec, -} - -#[cfg_attr(test, automock)] -#[async_trait] -pub trait Connect { - fn default() -> Self; - fn new(opts: Params) -> Self; - async fn set_signer(&mut self, signer: NostrSigner); - async fn connect(&self, relay_url: &Url) -> Result<()>; - async fn disconnect(&self) -> Result<()>; - fn get_fallback_relays(&self) -> &Vec; - fn get_more_fallback_relays(&self) -> &Vec; - fn get_blaster_relays(&self) -> &Vec; - async fn send_event_to( - &self, - git_repo_path: &Path, - url: &str, - event: nostr::event::Event, - ) -> Result; - async fn get_events( - &self, - relays: Vec, - filters: Vec, - ) -> Result>; - async fn get_events_per_relay( - &self, - relays: Vec, - filters: Vec, - progress_reporter: MultiProgress, - ) -> Result<(Vec>>, MultiProgress)>; - async fn fetch_all( - &self, - git_repo_path: &Path, - repo_coordinates: &HashSet, - user_profiles: &HashSet, - ) -> Result<(Vec>, MultiProgress)>; - async fn fetch_all_from_relay( - &self, - git_repo_path: &Path, - request: FetchRequest, - pb: &Option, - ) -> Result; -} - -#[async_trait] -impl Connect for Client { - fn default() -> Self { - let fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { - vec![ - "ws://localhost:8051".to_string(), - "ws://localhost:8052".to_string(), - ] - } else { - vec![ - "wss://relay.damus.io".to_string(), /* free, good reliability, have been known - * to delete all messages */ - "wss://nos.lol".to_string(), - "wss://relay.nostr.band".to_string(), - ] - }; - - let more_fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { - vec![ - "ws://localhost:8055".to_string(), - "ws://localhost:8056".to_string(), - ] - } else { - vec![ - "wss://purplerelay.com".to_string(), // free but reliability not tested - "wss://purplepages.es".to_string(), // for profile events but unreliable - "wss://relayable.org".to_string(), // free but not always reliable - ] - }; - - let blaster_relays: Vec = if std::env::var("NGITTEST").is_ok() { - vec!["ws://localhost:8057".to_string()] - } else { - vec!["wss://nostr.mutinywallet.com".to_string()] - }; - Client { - client: nostr_sdk::ClientBuilder::new() - .opts(Options::new().relay_limits(RelayLimits::disable())) - .build(), - fallback_relays, - more_fallback_relays, - blaster_relays, - } - } - fn new(opts: Params) -> Self { - Client { - client: nostr_sdk::ClientBuilder::new() - .opts(Options::new().relay_limits(RelayLimits::disable())) - .signer(&opts.keys.unwrap_or(nostr::Keys::generate())) - // .database( - // SQLiteDatabase::open(get_dirs()?.cache_dir().join("nostr-cache.sqlite")). - // await?, ) - .build(), - fallback_relays: opts.fallback_relays, - more_fallback_relays: opts.more_fallback_relays, - blaster_relays: opts.blaster_relays, - } - } - - async fn set_signer(&mut self, signer: NostrSigner) { - self.client.set_signer(Some(signer)).await; - } - - async fn connect(&self, relay_url: &Url) -> Result<()> { - self.client - .add_relay(relay_url) - .await - .context("cannot add relay")?; - - let relay = self.client.relay(relay_url).await?; - - if !relay.is_connected().await { - #[allow(clippy::large_futures)] - relay - .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) - .await; - } - - if !relay.is_connected().await { - bail!("connection timeout"); - } - Ok(()) - } - - async fn disconnect(&self) -> Result<()> { - self.client.disconnect().await?; - Ok(()) - } - - fn get_fallback_relays(&self) -> &Vec { - &self.fallback_relays - } - - fn get_more_fallback_relays(&self) -> &Vec { - &self.more_fallback_relays - } - - fn get_blaster_relays(&self) -> &Vec { - &self.blaster_relays - } - - async fn send_event_to( - &self, - git_repo_path: &Path, - url: &str, - event: Event, - ) -> Result { - self.client.add_relay(url).await?; - #[allow(clippy::large_futures)] - self.client.connect_relay(url).await?; - let res = self.client.send_event_to(vec![url], event.clone()).await?; - if let Some(err) = res.failed.get(&Url::parse(url)?) { - bail!(if let Some(err) = err { - err.to_string() - } else { - "error: unknown".to_string() - }); - } - save_event_in_cache(git_repo_path, &event).await?; - if event.kind().eq(&Kind::GitRepoAnnouncement) { - save_event_in_global_cache(git_repo_path, &event).await?; - } - Ok(event.id()) - } - - async fn get_events( - &self, - relays: Vec, - filters: Vec, - ) -> Result> { - let (relay_results, _) = self - .get_events_per_relay( - relays.iter().map(|r| Url::parse(r).unwrap()).collect(), - filters, - MultiProgress::new(), - ) - .await?; - Ok(get_dedup_events(relay_results)) - } - - async fn get_events_per_relay( - &self, - relays: Vec, - filters: Vec, - progress_reporter: MultiProgress, - ) -> Result<(Vec>>, MultiProgress)> { - // add relays - for relay in &relays { - self.client - .add_relay(relay.as_str()) - .await - .context("cannot add relay")?; - } - - let relays_map = self.client.relays().await; - - let futures: Vec<_> = relays - .clone() - .iter() - // don't look for events on blaster - .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) - .map(|r| (relays_map.get(r).unwrap(), filters.clone())) - .map(|(relay, filters)| async { - let pb = if std::env::var("NGITTEST").is_err() { - let pb = progress_reporter.add( - ProgressBar::new(1) - .with_prefix(format!("{: <11}{}", "connecting", relay.url())) - .with_style(pb_style()?), - ); - pb.enable_steady_tick(Duration::from_millis(300)); - Some(pb) - } else { - None - }; - #[allow(clippy::large_futures)] - match get_events_of(relay, filters, &pb).await { - Err(error) => { - if let Some(pb) = pb { - pb.set_style(pb_after_style(false)); - pb.set_prefix(format!("{: <11}{}", "error", relay.url())); - pb.finish_with_message( - console::style( - error.to_string().replace("relay pool error:", "error:"), - ) - .for_stderr() - .red() - .to_string(), - ); - } - Err(error) - } - Ok(res) => { - if let Some(pb) = pb { - pb.set_style(pb_after_style(true)); - pb.set_prefix(format!( - "{: <11}{}", - format!("{} events", res.len()), - relay.url() - )); - pb.finish_with_message(""); - } - Ok(res) - } - } - }) - .collect(); - - let relay_results: Vec>> = - stream::iter(futures).buffer_unordered(15).collect().await; - - Ok((relay_results, progress_reporter)) - } - - #[allow(clippy::too_many_lines)] - async fn fetch_all( - &self, - git_repo_path: &Path, - repo_coordinates: &HashSet, - user_profiles: &HashSet, - ) -> Result<(Vec>, MultiProgress)> { - let fallback_relays = &self - .fallback_relays - .iter() - .filter_map(|r| Url::parse(r).ok()) - .collect::>(); - - let mut request = create_relays_request( - git_repo_path, - repo_coordinates, - user_profiles, - fallback_relays.clone(), - ) - .await?; - - let progress_reporter = MultiProgress::new(); - - let mut processed_relays = HashSet::new(); - - let mut relay_reports: Vec> = vec![]; - - loop { - let relays = request - .repo_relays - .union(&request.user_relays_for_profiles) - // don't look for events on blaster - .filter(|&r| !r.as_str().contains("nostr.mutinywallet.com")) - .cloned() - .collect::>() - .difference(&processed_relays) - .cloned() - .collect::>(); - if relays.is_empty() { - break; - } - let profile_relays_only = request - .user_relays_for_profiles - .difference(&request.repo_relays) - .collect::>(); - for relay in &request.repo_relays { - self.client - .add_relay(relay.as_str()) - .await - .context("cannot add relay")?; - } - - let dim = Style::new().color256(247); - - let futures: Vec<_> = relays - .iter() - .map(|r| { - if profile_relays_only.contains(r) { - // if relay isn't a repo relay, just filter for user profile - FetchRequest { - selected_relay: Some(r.to_owned()), - repo_coordinates_without_relays: vec![], - proposals: HashSet::new(), - missing_contributor_profiles: request - .missing_contributor_profiles - .union( - &request - .profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect(), - ) - .copied() - .collect(), - ..request.clone() - } - } else { - FetchRequest { - selected_relay: Some(r.to_owned()), - ..request.clone() - } - } - }) - .map(|request| async { - let relay_column_width = request.relay_column_width; - - let relay_url = request - .selected_relay - .clone() - .context("fetch_all_from_relay called without a relay")?; - - let pb = if std::env::var("NGITTEST").is_err() { - let pb = progress_reporter.add( - ProgressBar::new(1) - .with_prefix( - dim.apply_to(format!( - "{: { - if let Some(pb) = pb { - pb.set_style(pb_after_style(false)); - pb.set_prefix( - dim.apply_to(format!("{: Ok(res), - } - }) - .collect(); - - for report in stream::iter(futures) - .buffer_unordered(15) - .collect::>>() - .await - { - relay_reports.push(report); - } - processed_relays.extend(relays.clone()); - - if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await { - request.repo_relays = repo_ref - .relays - .iter() - .filter_map(|r| Url::parse(r).ok()) - .collect(); - } - - request.user_relays_for_profiles = { - let mut set = HashSet::new(); - for user in &request - .profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect::>() - { - if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { - for r in user_ref.relays.write() { - if let Ok(url) = Url::parse(&r) { - set.insert(url); - } - } - } - } - set - }; - } - Ok((relay_reports, progress_reporter)) - } - - async fn fetch_all_from_relay( - &self, - git_repo_path: &Path, - request: FetchRequest, - pb: &Option, - ) -> Result { - let mut fresh_coordinates: HashSet = HashSet::new(); - for (c, _) in request.repo_coordinates_without_relays.clone() { - fresh_coordinates.insert(c); - } - let mut fresh_proposal_roots = request.proposals.clone(); - let mut fresh_profiles: HashSet = request - .missing_contributor_profiles - .union( - &request - .profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect(), - ) - .copied() - .collect(); - - let mut report = FetchReport::default(); - - let relay_url = request - .selected_relay - .clone() - .context("fetch_all_from_relay called without a relay")?; - - let relay_column_width = request.relay_column_width; - - self.connect(&relay_url).await?; - - let dim = Style::new().color256(247); - - loop { - let filters = - get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_profiles); - - if let Some(pb) = &pb { - pb.set_prefix( - dim.apply_to(format!( - "{: = get_events_of(&relay, filters.clone(), &None) - .await? - .iter() - // don't process events that don't match filters - .filter(|e| filters.iter().any(|f| f.match_event(e))) - .cloned() - .collect(); - // TODO: try reconcile - - process_fetched_events( - events, - &request, - git_repo_path, - &mut fresh_coordinates, - &mut fresh_proposal_roots, - &mut fresh_profiles, - &mut report, - ) - .await?; - - if fresh_coordinates.is_empty() - && fresh_proposal_roots.is_empty() - && fresh_profiles.is_empty() - { - break; - } - } - if let Some(pb) = pb { - pb.set_style(pb_after_style(true)); - pb.set_prefix( - dim.apply_to(format!( - "{: , - pb: &Option, -) -> Result> { - // relay.reconcile(filter, opts).await?; - - if !relay.is_connected().await { - #[allow(clippy::large_futures)] - relay - .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) - .await; - } - - if !relay.is_connected().await { - bail!("connection timeout"); - } else if let Some(pb) = pb { - pb.set_prefix(format!("connected {}", relay.url())); - } - let events = relay - .get_events_of( - filters, - // 20 is nostr_sdk default - std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), - nostr_sdk::FilterOptions::ExitOnEOSE, - ) - .await?; - Ok(events) -} - -#[derive(Default)] -pub struct Params { - pub keys: Option, - pub fallback_relays: Vec, - pub more_fallback_relays: Vec, - pub blaster_relays: Vec, -} - -fn get_dedup_events(relay_results: Vec>>) -> Vec { - let mut dedup_events: Vec = vec![]; - for events in relay_results.into_iter().flatten() { - for event in events { - if !dedup_events.iter().any(|e| event.id.eq(&e.id)) { - dedup_events.push(event); - } - } - } - dedup_events -} - -pub async fn sign_event(event_builder: EventBuilder, signer: &NostrSigner) -> Result { - if signer.r#type().eq(&nostr_signer::NostrSignerType::NIP46) { - let term = console::Term::stderr(); - term.write_line("signing event with remote signer...")?; - let event = signer - .sign_event_builder(event_builder) - .await - .context("failed to sign event")?; - term.clear_last_lines(1)?; - Ok(event) - } else { - signer - .sign_event_builder(event_builder) - .await - .context("failed to sign event") - } -} - -pub async fn fetch_public_key(signer: &NostrSigner) -> Result { - let term = console::Term::stderr(); - term.write_line("fetching npub from remote signer...")?; - let public_key = signer - .public_key() - .await - .context("failed to get npub from remote signer")?; - term.clear_last_lines(1)?; - Ok(public_key) -} - -fn pb_style() -> Result { - Ok( - ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( - "timeout_in", - |state: &ProgressState, w: &mut dyn Write| { - if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { - let dim = Style::new().color256(247); - write!( - w, - "{}", - dim.apply_to(format!( - "timeout in {:.1}s", - GET_EVENTS_TIMEOUT - state.elapsed().as_secs() - )) - ) - .unwrap(); - } - }, - ), - ) -} - -fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle { - ProgressStyle::with_template( - format!( - " {} {}", - if succeed { - console::style("✔".to_string()) - .for_stderr() - .green() - .to_string() - } else { - console::style("✘".to_string()) - .for_stderr() - .red() - .to_string() - }, - "{prefix} {msg}", - ) - .as_str(), - ) - .unwrap() -} - -async fn get_local_cache_database(git_repo_path: &Path) -> Result { - SQLiteDatabase::open(git_repo_path.join(".git/nostr-cache.sqlite")) - .await - .context("cannot open or create nostr cache database at .git/nostr-cache.sqlite") -} - -async fn get_global_cache_database(git_repo_path: &Path) -> Result { - SQLiteDatabase::open(if std::env::var("NGITTEST").is_err() { - create_dir_all(get_dirs()?.cache_dir()).context(format!( - "cannot create cache directory in: {:?}", - get_dirs()?.cache_dir() - ))?; - get_dirs()?.cache_dir().join("nostr-cache.sqlite") - } else { - git_repo_path.join(".git/test-global-cache.sqlite") - }) - .await - .context("cannot open ngit global nostr cache database") -} - -pub async fn get_events_from_cache( - git_repo_path: &Path, - filters: Vec, -) -> Result> { - get_local_cache_database(git_repo_path) - .await? - .query(filters.clone(), Order::Asc) - .await - .context( - "cannot execute query on opened git repo nostr cache database .git/nostr-cache.sqlite", - ) -} - -pub async fn get_event_from_global_cache( - git_repo_path: &Path, - filters: Vec, -) -> Result> { - get_global_cache_database(git_repo_path) - .await? - .query(filters.clone(), Order::Asc) - .await - .context("cannot execute query on opened ngit nostr cache database") -} - -pub async fn save_event_in_cache(git_repo_path: &Path, event: &nostr::Event) -> Result { - get_local_cache_database(git_repo_path) - .await? - .save_event(event) - .await - .context("cannot save event in local cache") -} - -pub async fn save_event_in_global_cache( - git_repo_path: &Path, - event: &nostr::Event, -) -> Result { - get_global_cache_database(git_repo_path) - .await? - .save_event(event) - .await - .context("cannot save event in local cache") -} - -pub async fn get_repo_ref_from_cache( - git_repo_path: &Path, - repo_coordinates: &HashSet, -) -> Result { - let mut maintainers = HashSet::new(); - let mut new_coordinate: bool; - - for c in repo_coordinates { - maintainers.insert(c.public_key); - } - let mut repo_events = vec![]; - loop { - new_coordinate = false; - let repo_events_filter = get_filter_repo_events(repo_coordinates); - - let events = [ - get_event_from_global_cache(git_repo_path, vec![repo_events_filter.clone()]).await?, - get_events_from_cache(git_repo_path, vec![repo_events_filter]).await?, - ] - .concat(); - for e in events { - if let Ok(repo_ref) = RepoRef::try_from(e.clone()) { - for m in repo_ref.maintainers { - if maintainers.insert(m) { - new_coordinate = true; - } - } - repo_events.push(e); - } - } - if !new_coordinate { - break; - } - } - repo_events.sort_by_key(|e| e.created_at); - let repo_ref = RepoRef::try_from( - repo_events - .first() - .context("no repo events at specified coordinates")? - .clone(), - )?; - - let mut events: HashMap = HashMap::new(); - for m in &maintainers { - if let Some(e) = repo_events.iter().find(|e| e.author().eq(m)) { - events.insert( - Coordinate { - kind: e.kind, - identifier: e.identifier().unwrap().to_string(), - public_key: e.author(), - relays: vec![], - }, - e.clone(), - ); - } - } - - Ok(RepoRef { - // use all maintainers from all events found, not just maintainers in the most - // recent event - maintainers: maintainers.iter().copied().collect::>(), - events, - ..repo_ref - }) -} - -pub async fn get_state_from_cache(git_repo_path: &Path, repo_ref: &RepoRef) -> Result { - RepoState::try_from( - get_events_from_cache( - git_repo_path, - vec![get_filter_state_events(&repo_ref.coordinates())], - ) - .await?, - ) -} - -#[allow(clippy::too_many_lines)] -async fn create_relays_request( - git_repo_path: &Path, - repo_coordinates: &HashSet, - user_profiles: &HashSet, - fallback_relays: HashSet, -) -> Result { - let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; - - let repo_coordinates = { - // add coordinates of users listed in maintainers to explicitly specified - // coodinates - let mut repo_coordinates = repo_coordinates.clone(); - if let Ok(repo_ref) = &repo_ref { - for c in repo_ref.coordinates() { - if !repo_coordinates - .iter() - .any(|e| e.identifier.eq(&c.identifier) && e.public_key.eq(&c.public_key)) - { - repo_coordinates.insert(c); - } - } - } - repo_coordinates - }; - - let repo_coordinates_without_relays = { - let mut set = HashSet::new(); - for c in &repo_coordinates { - set.insert(Coordinate { - kind: c.kind, - identifier: c.identifier.clone(), - public_key: c.public_key, - relays: vec![], - }); - } - set - }; - - let mut proposals: HashSet = HashSet::new(); - let mut missing_contributor_profiles: HashSet = HashSet::new(); - let mut contributors: HashSet = HashSet::new(); - - if !repo_coordinates_without_relays.is_empty() { - if let Ok(repo_ref) = &repo_ref { - for m in &repo_ref.maintainers { - contributors.insert(m.to_owned()); - } - } - - for event in &get_events_from_cache( - git_repo_path, - vec![ - nostr::Filter::default() - .kinds(vec![Kind::GitPatch]) - .custom_tag( - SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), - repo_coordinates_without_relays - .iter() - .map(std::string::ToString::to_string) - .collect::>(), - ), - ], - ) - .await? - { - if event_is_patch_set_root(event) || event_is_revision_root(event) { - proposals.insert(event.id()); - contributors.insert(event.author()); - } - } - - let profile_events = get_event_from_global_cache( - git_repo_path, - vec![get_filter_contributor_profiles(contributors.clone())], - ) - .await?; - for c in &contributors { - if let Some(event) = profile_events - .iter() - .find(|e| e.kind() == Kind::Metadata && e.author().eq(c)) - { - save_event_in_cache(git_repo_path, event).await?; - } else { - missing_contributor_profiles.insert(c.to_owned()); - } - } - } - - let profiles_to_fetch_from_user_relays = { - let mut user_profiles = user_profiles.clone(); - if let Ok(Some(current_user)) = get_logged_in_user(git_repo_path).await { - user_profiles.insert(current_user); - } - let mut map: HashMap = HashMap::new(); - for public_key in &user_profiles { - if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, public_key).await { - map.insert( - public_key.to_owned(), - (user_ref.metadata.created_at, user_ref.relays.created_at), - ); - } else { - map.insert( - public_key.to_owned(), - (Timestamp::from(0), Timestamp::from(0)), - ); - } - } - map - }; - - let user_relays_for_profiles = { - let mut set = HashSet::new(); - for user in &profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect::>() - { - if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { - for r in user_ref.relays.write() { - if let Ok(url) = Url::parse(&r) { - set.insert(url); - } - } - } else { - missing_contributor_profiles.insert(user.to_owned()); - } - } - set - }; - - let existing_events: HashSet = { - let mut existing_events: HashSet = HashSet::new(); - for filter in get_fetch_filters( - &repo_coordinates_without_relays, - &proposals, - &missing_contributor_profiles - .union( - &profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect(), - ) - .copied() - .collect(), - ) { - for (id, _) in get_local_cache_database(git_repo_path) - .await? - .negentropy_items(filter) - .await? - { - existing_events.insert(id); - } - } - existing_events - }; - - let relays = { - let mut relays = fallback_relays; - if let Ok(repo_ref) = &repo_ref { - for r in &repo_ref.relays { - if let Ok(url) = Url::parse(r) { - relays.insert(url); - } - } - } - for c in repo_coordinates { - for r in &c.relays { - if let Ok(url) = Url::parse(r) { - relays.insert(url); - } - } - } - relays - }; - - let relay_column_width = relays - .union(&user_relays_for_profiles) - .reduce(|a, r| { - if r.to_string() - .chars() - .count() - .gt(&a.to_string().chars().count()) - { - r - } else { - a - } - }) - .unwrap() - .to_string() - .chars() - .count() - + 2; - - Ok(FetchRequest { - selected_relay: None, - repo_relays: relays, - relay_column_width, - repo_coordinates_without_relays: if let Ok(repo_ref) = &repo_ref { - repo_ref.coordinates_with_timestamps() - } else { - repo_coordinates_without_relays - .iter() - .map(|c| (c.clone(), None)) - .collect() - }, - state: if let Ok(repo_ref) = &repo_ref { - if let Ok(existing_state) = get_state_from_cache(git_repo_path, repo_ref).await { - Some((existing_state.event.created_at, existing_state.event.id)) - } else { - None - } - } else { - None - }, - proposals, - contributors, - missing_contributor_profiles, - existing_events, - profiles_to_fetch_from_user_relays, - user_relays_for_profiles, - }) -} - -#[allow(clippy::too_many_lines)] -async fn process_fetched_events( - events: Vec, - request: &FetchRequest, - git_repo_path: &Path, - fresh_coordinates: &mut HashSet, - fresh_proposal_roots: &mut HashSet, - fresh_profiles: &mut HashSet, - report: &mut FetchReport, -) -> Result<()> { - for event in &events { - if !request.existing_events.contains(&event.id) { - save_event_in_cache(git_repo_path, event).await?; - if event.kind().eq(&Kind::GitRepoAnnouncement) { - save_event_in_global_cache(git_repo_path, event).await?; - let new_coordinate = !request - .repo_coordinates_without_relays - .iter() - .map(|(c, _)| c.clone()) - .any(|c| { - c.identifier.eq(event.identifier().unwrap()) - && c.public_key.eq(&event.pubkey) - }); - let update_to_existing = !new_coordinate - && request - .repo_coordinates_without_relays - .iter() - .any(|(c, t)| { - c.identifier.eq(event.identifier().unwrap()) - && c.public_key.eq(&event.pubkey) - && if let Some(t) = t { - event.created_at.gt(t) - } else { - true - } - }); - if update_to_existing { - report.updated_repo_announcements.push(( - Coordinate { - kind: event.kind(), - public_key: event.author(), - identifier: event.identifier().unwrap().to_owned(), - relays: vec![], - }, - event.created_at, - )); - } - // if contains new maintainer - if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) { - for m in &repo_ref.maintainers { - if !request - .repo_coordinates_without_relays // prexisting maintainers - .iter() - .map(|(c, _)| c.clone()) - .collect::>() - .union(&report.repo_coordinates_without_relays) // already added maintainers - .any(|c| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key)) - { - let c = Coordinate { - kind: event.kind(), - public_key: *m, - identifier: repo_ref.identifier.clone(), - relays: vec![], - }; - fresh_coordinates.insert(c.clone()); - report.repo_coordinates_without_relays.insert(c); - - if !request.contributors.contains(m) - && !request - .profiles_to_fetch_from_user_relays - .clone() - .into_keys() - .collect::>() - .contains(m) - && !fresh_profiles.contains(m) - { - fresh_profiles.insert(m.to_owned()); - } - } - } - } - } else if event.kind().eq(&STATE_KIND) { - let existing_state = if report.updated_state.is_some() { - report.updated_state - } else { - request.state - }; - if let Some((timestamp, id)) = existing_state { - if event.created_at.gt(×tamp) - || (event.created_at.eq(×tamp) && event.id.gt(&id)) - { - report.updated_state = Some((event.created_at, event.id)); - } - } - } else if event_is_patch_set_root(event) { - fresh_proposal_roots.insert(event.id); - report.proposals.insert(event.id); - if !request.contributors.contains(&event.author()) - && !fresh_profiles.contains(&event.author()) - { - fresh_profiles.insert(event.author()); - } - } else if [Kind::RelayList, Kind::Metadata].contains(&event.kind()) { - if request - .missing_contributor_profiles - .contains(&event.author()) - { - report.contributor_profiles.insert(event.author()); - } else if let Some((_, (metadata_timestamp, relay_list_timestamp))) = request - .profiles_to_fetch_from_user_relays - .get_key_value(&event.author()) - { - if (Kind::Metadata.eq(&event.kind()) - && event.created_at().gt(metadata_timestamp)) - || (Kind::RelayList.eq(&event.kind()) - && event.created_at().gt(relay_list_timestamp)) - { - report.profile_updates.insert(event.author()); - } - } - save_event_in_global_cache(git_repo_path, event).await?; - } - } - } - for event in &events { - if !request.existing_events.contains(&event.id) - && !event.event_ids().any(|id| report.proposals.contains(id)) - { - if event.kind().eq(&Kind::GitPatch) && !event_is_patch_set_root(event) { - report.commits.insert(event.id); - } else if status_kinds().contains(&event.kind()) { - report.statuses.insert(event.id); - } - } - } - Ok(()) -} - -pub fn consolidate_fetch_reports(reports: Vec>) -> FetchReport { - let mut report = FetchReport::default(); - for relay_report in reports.into_iter().flatten() { - for c in relay_report.repo_coordinates_without_relays { - if !report - .repo_coordinates_without_relays - .iter() - .any(|e| e.eq(&c)) - { - report.repo_coordinates_without_relays.insert(c); - } - } - for (r, t) in relay_report.updated_repo_announcements { - if let Some(i) = report - .updated_repo_announcements - .iter() - .position(|(e, _)| e.eq(&r)) - { - let (_, existing_t) = &report.updated_repo_announcements[i]; - if t.gt(existing_t) { - report.updated_repo_announcements[i] = (r, t); - } - } else { - report.updated_repo_announcements.push((r, t)); - } - } - if let Some((timestamp, id)) = relay_report.updated_state { - if let Some((existing_timestamp, existing_id)) = report.updated_state { - if timestamp.gt(&existing_timestamp) - || (timestamp.eq(&existing_timestamp) && id.gt(&existing_id)) - { - report.updated_state = Some((timestamp, id)); - } - } else { - report.updated_state = Some((timestamp, id)); - } - } - for c in relay_report.proposals { - report.proposals.insert(c); - } - for c in relay_report.commits { - report.commits.insert(c); - } - for c in relay_report.statuses { - report.statuses.insert(c); - } - for c in relay_report.contributor_profiles { - report.contributor_profiles.insert(c); - } - for c in relay_report.profile_updates { - report.profile_updates.insert(c); - } - } - report -} -pub fn get_fetch_filters( - repo_coordinates: &HashSet, - proposal_ids: &HashSet, - required_profiles: &HashSet, -) -> Vec { - [ - if repo_coordinates.is_empty() { - vec![] - } else { - vec![ - get_filter_state_events(repo_coordinates), - get_filter_repo_events(repo_coordinates), - nostr::Filter::default() - .kinds(vec![Kind::GitPatch, Kind::EventDeletion]) - .custom_tag( - SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), - repo_coordinates - .iter() - .map(std::string::ToString::to_string) - .collect::>(), - ), - ] - }, - if proposal_ids.is_empty() { - vec![] - } else { - vec![ - nostr::Filter::default() - .events(proposal_ids.clone()) - .kinds([vec![Kind::GitPatch, Kind::EventDeletion], status_kinds()].concat()), - ] - }, - if required_profiles.is_empty() { - vec![] - } else { - vec![get_filter_contributor_profiles(required_profiles.clone())] - }, - ] - .concat() -} - -pub fn get_filter_repo_events(repo_coordinates: &HashSet) -> nostr::Filter { - nostr::Filter::default() - .kind(Kind::GitRepoAnnouncement) - .identifiers( - repo_coordinates - .iter() - .map(|c| c.identifier.clone()) - .collect::>(), - ) - .authors( - repo_coordinates - .iter() - .map(|c| c.public_key) - .collect::>(), - ) -} - -pub static STATE_KIND: nostr::Kind = Kind::Custom(30618); -pub fn get_filter_state_events(repo_coordinates: &HashSet) -> nostr::Filter { - nostr::Filter::default() - .kind(STATE_KIND) - .identifiers( - repo_coordinates - .iter() - .map(|c| c.identifier.clone()) - .collect::>(), - ) - .authors( - repo_coordinates - .iter() - .map(|c| c.public_key) - .collect::>(), - ) -} - -pub fn get_filter_contributor_profiles(contributors: HashSet) -> nostr::Filter { - nostr::Filter::default() - .kinds(vec![Kind::Metadata, Kind::RelayList]) - .authors(contributors) -} - -#[derive(Default)] -pub struct FetchReport { - repo_coordinates_without_relays: HashSet, - updated_repo_announcements: Vec<(Coordinate, Timestamp)>, - updated_state: Option<(Timestamp, EventId)>, - proposals: HashSet, - /// commits against existing propoals - commits: HashSet, - statuses: HashSet, - contributor_profiles: HashSet, - profile_updates: HashSet, -} - -impl Display for FetchReport { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses" - let mut display_items: Vec = vec![]; - if !self.repo_coordinates_without_relays.is_empty() { - display_items.push(format!( - "{} new maintainer{}", - self.repo_coordinates_without_relays.len(), - if self.repo_coordinates_without_relays.len() > 1 { - "s" - } else { - "" - }, - )); - } - if !self.updated_repo_announcements.is_empty() { - display_items.push(format!( - "{} announcement update{}", - self.updated_repo_announcements.len(), - if self.updated_repo_announcements.len() > 1 { - "s" - } else { - "" - }, - )); - } - if self.updated_state.is_some() { - display_items.push("new state".to_string()); - } - if !self.proposals.is_empty() { - display_items.push(format!( - "{} proposal{}", - self.proposals.len(), - if self.proposals.len() > 1 { "s" } else { "" }, - )); - } - if !self.commits.is_empty() { - display_items.push(format!( - "{} commit{}", - self.commits.len(), - if self.commits.len() > 1 { "s" } else { "" }, - )); - } - if !self.statuses.is_empty() { - display_items.push(format!( - "{} status{}", - self.statuses.len(), - if self.statuses.len() > 1 { "es" } else { "" }, - )); - } - if !self.contributor_profiles.is_empty() { - display_items.push(format!( - "{} user profile{}", - self.contributor_profiles.len(), - if self.contributor_profiles.len() > 1 { - "s" - } else { - "" - }, - )); - } - if !self.profile_updates.is_empty() { - display_items.push(format!( - "{} profile update{}", - self.profile_updates.len(), - if self.profile_updates.len() > 1 { - "s" - } else { - "" - }, - )); - } - write!(f, "{}", display_items.join(", ")) - } -} - -#[derive(Default, Clone)] -pub struct FetchRequest { - repo_relays: HashSet, - selected_relay: Option, - relay_column_width: usize, - repo_coordinates_without_relays: Vec<(Coordinate, Option)>, - state: Option<(Timestamp, EventId)>, - proposals: HashSet, - contributors: HashSet, - missing_contributor_profiles: HashSet, - existing_events: HashSet, - profiles_to_fetch_from_user_relays: HashMap, - user_relays_for_profiles: HashSet, -} - -pub async fn fetching_with_report( - git_repo_path: &Path, - #[cfg(test)] client: &crate::client::MockConnect, - #[cfg(not(test))] client: &Client, - repo_coordinates: &HashSet, -) -> Result { - let term = console::Term::stderr(); - term.write_line("fetching updates...")?; - let (relay_reports, progress_reporter) = client - .fetch_all(git_repo_path, repo_coordinates, &HashSet::new()) - .await?; - if !relay_reports.iter().any(std::result::Result::is_err) { - let _ = progress_reporter.clear(); - } - let report = consolidate_fetch_reports(relay_reports); - if report.to_string().is_empty() { - println!("no updates"); - } else { - println!("updates: {report}"); - } - Ok(report) -} -- cgit v1.2.3