// have you considered // TO USE ASYNC // in traits (required for mocking unit tests) // https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html // https://github.com/dtolnay/async-trait // see https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html // I think we can use the async-trait crate and switch to the native feature // which is currently in nightly. alternatively we can use nightly as it looks // certain that the implementation is going to make it to stable but we don't // want to inadvertlty use other features of nightly that might be removed. use std::{ collections::{HashMap, HashSet}, fmt::{Display, Write}, fs::create_dir_all, path::Path, sync::{Arc, RwLock}, time::Duration, }; use anyhow::{Context, Result, anyhow, bail}; use async_trait::async_trait; use console::Style; use futures::{ future::join_all, stream::{self, StreamExt}, }; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle}; #[cfg(test)] use mockall::*; use nostr::{ Event, event::UnsignedEvent, filter::Alphabet, nips::{ nip01::Coordinate, nip05::{Nip05Address, Nip05Profile}, nip19::Nip19Coordinate, }, signer::SignerBackend, }; use nostr_database::{NostrDatabase, SaveEventStatus}; use nostr_lmdb::NostrLMDB; use nostr_relay_pool::relay::ReqExitPolicy; use nostr_sdk::{ ClientOptions, EventBuilder, EventId, Kind, NostrSigner, PublicKey, RelayUrl, SingleLetterTag, Timestamp, Url, prelude::RelayLimits, }; use serde_json::Value; use crate::{ get_dirs, git::{Repo, RepoActions, get_git_config_item}, git_events::{ KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, KIND_USER_GRASP_LIST, event_is_cover_letter, event_is_patch_set_root, event_is_revision_root, event_is_valid_pr_or_pr_update, status_kinds, }, login::{get_likely_logged_in_user, user::get_user_ref_from_cache}, repo_ref::{RepoRef, normalize_grasp_server_url}, repo_state::RepoState, }; #[allow(clippy::struct_field_names)] pub struct Client { client: nostr_sdk::Client, relay_default_set: Vec, more_fallback_relays: Vec, blaster_relays: Vec, fallback_signer_relays: Vec, grasp_default_set: Vec, relays_not_to_retry: Arc>>, } impl Client { /// Marks a relay as skipped for the current session with a given reason. /// This method encapsulates the write lock for the relays_not_to_retry map. fn skip_relay_for_session(&self, relay_url: RelayUrl, reason: String) { self.relays_not_to_retry .write() .unwrap() .insert(relay_url, reason); } /// Checks if a relay should be skipped for the current session and returns /// the reason if it is. This method encapsulates the read lock for the /// relays_not_to_retry map. fn is_relay_skipped_for_session(&self, relay_url: &RelayUrl) -> Option { self.relays_not_to_retry .read() .unwrap() .get(relay_url) .cloned() } } #[cfg_attr(test, automock)] #[async_trait] pub trait Connect { fn default() -> Self; fn new(opts: Params) -> Self; async fn set_signer(&mut self, signer: Arc); async fn connect(&self, relay_url: &RelayUrl) -> Result<()>; async fn disconnect(&self) -> Result<()>; fn get_relay_default_set(&self) -> &Vec; fn get_more_fallback_relays(&self) -> &Vec; fn get_blaster_relays(&self) -> &Vec; fn get_fallback_signer_relays(&self) -> &Vec; fn get_grasp_default_set(&self) -> &Vec; async fn send_event_to<'a>( &self, git_repo_path: Option<&'a Path>, url: &str, event: nostr::event::Event, ) -> Result; async fn get_events( &self, relays: Vec, filters: Vec, ) -> Result>; async fn get_events_per_relay( &self, relays: Vec, filters: Vec, progress_reporter: MultiProgress, ) -> Result<(Vec>>, MultiProgress)>; async fn fetch_all<'a>( &self, git_repo_path: Option<&'a Path>, repo_coordinates: Option<&'a Nip19Coordinate>, user_profiles: &HashSet, ) -> Result<(Vec>, MultiProgress)>; async fn fetch_all_from_relay<'a>( &self, git_repo_path: Option<&'a Path>, request: FetchRequest, pb: &Option, ) -> Result; } #[async_trait] impl Connect for Client { fn default() -> Self { Self::new(Params::default()) } fn new(opts: Params) -> Self { Client { client: if let Some(keys) = opts.keys { nostr_sdk::ClientBuilder::new() .opts( ClientOptions::new() .relay_limits(RelayLimits::disable()) .verify_subscriptions(true), ) .signer(keys) .build() } else { nostr_sdk::ClientBuilder::new() .opts( ClientOptions::new() .relay_limits(RelayLimits::disable()) .verify_subscriptions(true), ) .build() }, relay_default_set: opts.relay_default_set, more_fallback_relays: opts.more_fallback_relays, blaster_relays: opts.blaster_relays, fallback_signer_relays: opts.fallback_signer_relays, grasp_default_set: opts.grasp_default_set, relays_not_to_retry: Arc::new(RwLock::new(HashMap::new())), } } async fn set_signer(&mut self, signer: Arc) { self.client.set_signer(signer).await; } async fn connect(&self, relay_url: &RelayUrl) -> Result<()> { if let Some(reason) = self.is_relay_skipped_for_session(relay_url) { bail!("{reason}"); } self.client .add_relay(relay_url) .await .context("failed to add relay")?; let relay = self.client.relay(relay_url).await?; if !relay.is_connected() { #[allow(clippy::large_futures)] let _ = relay .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) .await; } if !relay.is_connected() { self.skip_relay_for_session(relay_url.clone(), "connection timeout".to_string()); bail!("connection timeout"); } Ok(()) } async fn disconnect(&self) -> Result<()> { self.client.disconnect().await; Ok(()) } fn get_relay_default_set(&self) -> &Vec { &self.relay_default_set } fn get_more_fallback_relays(&self) -> &Vec { &self.more_fallback_relays } fn get_blaster_relays(&self) -> &Vec { &self.blaster_relays } fn get_fallback_signer_relays(&self) -> &Vec { &self.fallback_signer_relays } fn get_grasp_default_set(&self) -> &Vec { &self.grasp_default_set } async fn send_event_to<'a>( &self, git_repo_path: Option<&'a Path>, url: &str, event: Event, ) -> Result { self.client.add_relay(url).await?; #[allow(clippy::large_futures)] self.client.connect_relay(url).await?; self.client.relay(url).await?.send_event(&event).await?; if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, &event).await?; } if [Kind::GitRepoAnnouncement, KIND_USER_GRASP_LIST].contains(&event.kind) { save_event_in_global_cache(git_repo_path, &event).await?; } Ok(event.id) } async fn get_events( &self, relays: Vec, filters: Vec, ) -> Result> { let (relay_results, _) = self .get_events_per_relay( relays.iter().map(|r| RelayUrl::parse(r).unwrap()).collect(), filters, MultiProgress::new(), ) .await?; Ok(get_dedup_events(relay_results)) } async fn get_events_per_relay( &self, relays: Vec, filters: Vec, progress_reporter: MultiProgress, ) -> Result<(Vec>>, MultiProgress)> { // add relays for relay in &relays { self.client .add_relay(relay.as_str()) .await .context("failed to add relay")?; } let relays_map = self.client.relays().await; let futures: Vec<_> = relays .clone() .iter() // don't look for events on blaster .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) .map(|r| (relays_map.get(r).unwrap(), filters.clone())) .map(|(relay, filters)| async { let pb = if std::env::var("NGITTEST").is_err() { let pb = progress_reporter.add( ProgressBar::new(1) .with_prefix(format!("{: <11}{}", "connecting", relay.url())) .with_style(pb_style()?), ); pb.enable_steady_tick(Duration::from_millis(300)); Some(pb) } else { None }; fn update_progress_bar_with_error( relay_url: &RelayUrl, pb: Option, error: &anyhow::Error, ) { if let Some(pb) = pb { pb.set_style(pb_after_style(false)); pb.set_prefix(format!("{: <11}{}", "error", relay_url)); pb.finish_with_message( console::style( error.to_string().replace("relay pool error:", "error:"), ) .for_stderr() .red() .to_string(), ); } } if let Some(reason) = self.is_relay_skipped_for_session(relay.url()) { update_progress_bar_with_error(relay.url(), pb, &anyhow!("{reason}")); bail!("{reason}"); } #[allow(clippy::large_futures)] match get_events_of(relay, filters, &pb).await { Err(error) => { // Check error for timeout/connection issues and add to skip list if error.to_string().contains("connection timeout") { // Simple check, refine as needed self.skip_relay_for_session(relay.url().clone(), error.to_string()); } update_progress_bar_with_error(relay.url(), pb, &error); Err(error) } Ok(res) => { if let Some(pb) = pb { pb.set_style(pb_after_style(true)); pb.set_prefix(format!( "{: <11}{}", format!("{} events", res.len()), relay.url() )); pb.finish_with_message(""); } Ok(res) } } }) .collect(); let relay_results: Vec>> = stream::iter(futures).buffer_unordered(15).collect().await; Ok((relay_results, progress_reporter)) } #[allow(clippy::too_many_lines)] async fn fetch_all<'a>( &self, git_repo_path: Option<&'a Path>, trusted_maintainer_coordinate: Option<&'a Nip19Coordinate>, user_profiles: &HashSet, ) -> Result<(Vec>, MultiProgress)> { let relay_default_set = &self .relay_default_set .iter() .filter_map(|r| RelayUrl::parse(r).ok()) .collect::>(); let mut request = create_relays_request( git_repo_path, trusted_maintainer_coordinate, user_profiles, relay_default_set.clone(), ) .await?; let progress_reporter = MultiProgress::new(); let mut processed_relays = HashSet::new(); let mut relay_reports: Vec> = vec![]; loop { let relays = request .repo_relays .union(&request.user_relays_for_profiles) // don't look for events on blaster .filter(|&r| !r.as_str().contains("nostr.mutinywallet.com")) .cloned() .collect::>() .difference(&processed_relays) .cloned() .collect::>(); if relays.is_empty() { break; } let profile_relays_only = request .user_relays_for_profiles .difference(&request.repo_relays) .collect::>(); for relay in &request.repo_relays { self.client .add_relay(relay.as_str()) .await .context("failed to add relay")?; } let dim = Style::new().color256(247); let futures: Vec<_> = relays .iter() .map(|r| { if profile_relays_only.contains(r) { // if relay isn't a repo relay, just filter for user profile FetchRequest { selected_relay: Some(r.to_owned()), repo_coordinates_without_relays: vec![], proposals: HashSet::new(), missing_contributor_profiles: request .missing_contributor_profiles .union( &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(), ..request.clone() } } else { FetchRequest { selected_relay: Some(r.to_owned()), ..request.clone() } } }) .map(|request| async { let relay_column_width = request.relay_column_width; let relay_url = request .selected_relay .clone() .context("fetch_all_from_relay called without a relay")?; let pb = if std::env::var("NGITTEST").is_err() { let pb = progress_reporter.add( ProgressBar::new(1) .with_prefix( dim.apply_to(format!( "{: , error: &anyhow::Error, ) { if let Some(pb) = pb { pb.set_style(pb_after_style(false)); pb.set_prefix( Style::new() .color256(247) .apply_to(format!("{: { // Check error for timeout/connection issues and add to skip list if error.to_string().contains("connection timeout") { // Simple check, refine as needed self.skip_relay_for_session(relay_url.clone(), error.to_string()); } update_progress_bar_with_error( relay_column_width, &relay_url, pb, &error, ); Err(error) } Ok(res) => Ok(res), } }) .collect(); for report in stream::iter(futures) .buffer_unordered(15) .collect::>>() .await { relay_reports.push(report); } processed_relays.extend(relays.clone()); if let Some(trusted_maintainer_coordinate) = trusted_maintainer_coordinate { if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, trusted_maintainer_coordinate).await { request.repo_relays = repo_ref.relays.iter().cloned().collect(); } } request.user_relays_for_profiles = { let mut set = HashSet::new(); for user in &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { for r in user_ref.relays.write() { if let Ok(url) = RelayUrl::parse(&r) { set.insert(url); } } } } set }; } Ok((relay_reports, progress_reporter)) } async fn fetch_all_from_relay<'a>( &self, git_repo_path: Option<&'a Path>, request: FetchRequest, pb: &Option, ) -> Result { let mut fresh_coordinates: HashSet = HashSet::new(); for (c, _) in request.repo_coordinates_without_relays.clone() { fresh_coordinates.insert(c); } let mut fresh_proposal_roots = request.proposals.clone(); let mut fresh_profiles: HashSet = request .missing_contributor_profiles .union( &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(); let mut report = FetchReport::default(); let relay_url = request .selected_relay .clone() .context("fetch_all_from_relay called without a relay")?; let relay_column_width = request.relay_column_width; self.connect(&relay_url).await?; let dim = Style::new().color256(247); loop { let filters = get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_profiles); if let Some(pb) = &pb { pb.set_prefix( dim.apply_to(format!( "{: = get_events_of(&relay, filters.clone(), &None).await?; // TODO: try reconcile process_fetched_events( events, &request, git_repo_path, &mut fresh_coordinates, &mut fresh_proposal_roots, &mut fresh_profiles, &mut report, ) .await?; if fresh_coordinates.is_empty() && fresh_proposal_roots.is_empty() && fresh_profiles.is_empty() { break; } } if let Some(pb) = pb { pb.set_style(pb_after_style(true)); pb.set_prefix( dim.apply_to(format!( "{: , pb: &Option, ) -> Result> { // relay.reconcile(filter, opts).await?; if !relay.is_connected() { #[allow(clippy::large_futures)] let _ = relay .try_connect(std::time::Duration::from_secs(CONNECTION_TIMEOUT)) .await; } if !relay.is_connected() { bail!("connection timeout"); } else if let Some(pb) = pb { pb.set_prefix(format!("connected {}", relay.url())); } let events_res = join_all(filters.into_iter().map(|filter| async { relay .fetch_events( filter, // 20 is nostr_sdk default std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), ReqExitPolicy::ExitOnEOSE, ) .await })) .await; // no Event is being mutated, just new items added to the set #[allow(clippy::mutable_key_type)] let mut events: HashSet = HashSet::new(); for res in events_res { events.extend(res?); } Ok(events.into_iter().collect()) } pub struct Params { pub keys: Option, pub relay_default_set: Vec, pub more_fallback_relays: Vec, pub blaster_relays: Vec, pub fallback_signer_relays: Vec, pub grasp_default_set: Vec, } impl Default for Params { fn default() -> Self { Params { keys: None, relay_default_set: if std::env::var("NGITTEST").is_ok() { vec![ "ws://localhost:8051".to_string(), "ws://localhost:8052".to_string(), ] } else { vec![ "wss://relay.damus.io".to_string(), /* free, good reliability, have been * known * to delete all messages */ "wss://nos.lol".to_string(), "wss://relay.nostr.band".to_string(), ] }, more_fallback_relays: if std::env::var("NGITTEST").is_ok() { vec![ "ws://localhost:8055".to_string(), "ws://localhost:8056".to_string(), ] } else { vec![ "wss://purplerelay.com".to_string(), // free but reliability not tested "wss://purplepages.es".to_string(), // for profile events but unreliable "wss://relayable.org".to_string(), // free but not always reliable ] }, blaster_relays: if std::env::var("NGITTEST").is_ok() { vec!["ws://localhost:8057".to_string()] } else { vec![] }, fallback_signer_relays: if std::env::var("NGITTEST").is_ok() { vec!["ws://localhost:8051".to_string()] } else { vec!["wss://relay.nsec.app".to_string()] }, grasp_default_set: if std::env::var("NGITTEST").is_ok() { vec![] } else { vec!["relay.ngit.dev".to_string(), "gitnostr.com".to_string()] }, } } } impl Params { pub fn with_git_config_relay_defaults(git_repo: &Option<&Repo>) -> Self { let mut params = Params::default(); if std::env::var("NGITTEST").is_err() { // ignore git config settings under test if let Ok(Some(relay_defaults)) = get_git_config_item(git_repo, "nostr.relay-default-set") { let new_default_relays: Vec = relay_defaults .split(';') .filter_map(|url| RelayUrl::parse(url).ok()) // Attempt to parse and filter out errors .map(|relay_url| relay_url.to_string()) // Convert RelayUrl back to String .collect(); // elsewhere it is assumed this isn't empty if !new_default_relays.is_empty() { params.relay_default_set = new_default_relays; } } if let Ok(Some(relay_blasters)) = get_git_config_item(git_repo, "nostr.relay-blaster-set") { params.blaster_relays = relay_blasters .split(';') .filter_map(|url| RelayUrl::parse(url).ok()) // Attempt to parse and filter out errors .map(|relay_url| relay_url.to_string()) // Convert RelayUrl back to String .collect(); } if let Ok(Some(relay_signer)) = get_git_config_item(git_repo, "nostr.relay-signer-fallback-set") { params.fallback_signer_relays = relay_signer .split(';') .filter_map(|url| RelayUrl::parse(url).ok()) // Attempt to parse and filter out errors .map(|relay_url| relay_url.to_string()) // Convert RelayUrl back to String .collect(); } if let Ok(Some(grasp_default_servers)) = get_git_config_item(git_repo, "nostr.grasp-default-set") { let new_default_grasp_servers: Vec = grasp_default_servers .split(';') .filter_map(|url| normalize_grasp_server_url(url).ok()) // Attempt to parse and filter out errors .collect(); if !new_default_grasp_servers.is_empty() { params.grasp_default_set = new_default_grasp_servers; } } } params } } fn get_dedup_events(relay_results: Vec>>) -> Vec { let mut dedup_events: Vec = vec![]; for events in relay_results.into_iter().flatten() { for event in events { if !dedup_events.iter().any(|e| event.id.eq(&e.id)) { dedup_events.push(event); } } } dedup_events } pub async fn sign_event( event_builder: EventBuilder, signer: &Arc, description: String, ) -> Result { if signer.backend() == SignerBackend::NostrConnect { let term = console::Term::stderr(); term.write_line(&format!( "signing event ({description}) with remote signer..." ))?; let event = signer .sign_event(event_builder.build(signer.get_public_key().await?)) .await .context("failed to sign event")?; term.clear_last_lines(1)?; Ok(event) } else { signer .sign_event(event_builder.build(signer.get_public_key().await?)) .await .context("failed to sign event") } } pub async fn sign_draft_event( draft_event: UnsignedEvent, signer: &Arc, description: String, ) -> Result { if signer.backend() == SignerBackend::NostrConnect { let term = console::Term::stderr(); term.write_line(&format!( "signing event ({description}) with remote signer..." ))?; let event = signer .sign_event(draft_event) .await .context("failed to sign event")?; term.clear_last_lines(1)?; Ok(event) } else { signer .sign_event(draft_event) .await .context("failed to sign event") } } pub async fn fetch_public_key(signer: &Arc) -> Result { if signer.backend() == SignerBackend::NostrConnect { let term = console::Term::stderr(); term.write_line("fetching npub from remote signer...")?; let public_key = signer .get_public_key() .await .context("failed to get npub from remote signer")?; term.clear_last_lines(1)?; Ok(public_key) } else { signer .get_public_key() .await .context("failed to get public key from local keys") } } pub async fn nip05_query(nip05_addr: &str) -> Result { let addr_deconstructed = Nip05Address::parse(nip05_addr) .context(format!("cannot parse nip05 address: {nip05_addr}"))?; let json_res: Value = reqwest::Client::new() .get(addr_deconstructed.url().to_string()) .send() .await .context(format!( "nip05 server is not responding for address: {nip05_addr}" ))? .json() .await .context(format!( "nip05 server response did not respond with json when querying address: {nip05_addr}" ))?; Nip05Profile::from_json(&addr_deconstructed, &json_res).context(format!( "cannot get public key for nip05 address: {nip05_addr}" )) } fn pb_style() -> Result { Ok( ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( "timeout_in", |state: &ProgressState, w: &mut dyn Write| { if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { let dim = Style::new().color256(247); write!( w, "{}", dim.apply_to(format!( "timeout in {:.1}s", GET_EVENTS_TIMEOUT - state.elapsed().as_secs() )) ) .unwrap(); } }, ), ) } fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle { ProgressStyle::with_template( format!( " {} {}", if succeed { console::style("✔".to_string()) .for_stderr() .green() .to_string() } else { console::style("✘".to_string()) .for_stderr() .red() .to_string() }, "{prefix} {msg}", ) .as_str(), ) .unwrap() } async fn get_local_cache_database(git_repo_path: &Path) -> Result { NostrLMDB::open(git_repo_path.join(".git/nostr-cache.lmdb")) .context("failed to open or create nostr cache database at .git/nostr-cache.lmdb") } async fn get_global_cache_database(git_repo_path: Option<&Path>) -> Result { let path = if std::env::var("NGITTEST").is_ok() { if let Some(git_repo_path) = git_repo_path { git_repo_path.join(".git/test-global-cache.lmdb") } else { bail!("git_repo must be supplied to get_global_cache_database during integration tests") } } else { create_dir_all(get_dirs()?.cache_dir()).context(format!( "failed to create cache directory in: {:?}", get_dirs()?.cache_dir() ))?; get_dirs()?.cache_dir().join("nostr-cache.lmdb") }; NostrLMDB::open(path).context("failed to open ngit global nostr cache database") } pub async fn get_events_from_local_cache( git_repo_path: &Path, filters: Vec, ) -> Result> { let db = get_local_cache_database(git_repo_path).await?; let query_results = join_all(filters.into_iter().map(|filter| async { db.query(filter) .await .context("failed to execute query on opened ngit nostr cache database") })) .await; // no Event is being mutated, just new items added to the set #[allow(clippy::mutable_key_type)] let mut events: HashSet = HashSet::new(); for result in query_results { events.extend(result?); } Ok(events.into_iter().collect()) } pub async fn get_event_from_global_cache( git_repo_path: Option<&Path>, filters: Vec, ) -> Result> { let db = get_global_cache_database(git_repo_path).await?; let query_results = join_all(filters.into_iter().map(|filter| async { db.query(filter) .await .context("failed to execute query on opened ngit nostr cache database") })) .await; // no Event is being mutated, just new items added to the set #[allow(clippy::mutable_key_type)] let mut events: HashSet = HashSet::new(); for result in query_results { events.extend(result?); } Ok(events.into_iter().collect()) } pub async fn save_event_in_local_cache(git_repo_path: &Path, event: &nostr::Event) -> Result { match get_local_cache_database(git_repo_path) .await? .save_event(event) .await .context("failed to save event in local cache")? { SaveEventStatus::Success => Ok(true), _ => Ok(false), } } pub async fn save_event_in_global_cache( git_repo_path: Option<&Path>, event: &nostr::Event, ) -> Result { match get_global_cache_database(git_repo_path) .await? .save_event(event) .await .context("failed to save event in local cache") { Ok(SaveEventStatus::Success) => Ok(true), Ok(_) => Ok(false), Err(e) => Err(e).context("failed to save event in local cache"), } } // use annoucement from trusted maintainer but recursively add maintainers, git // servers and relays pub async fn get_repo_ref_from_cache( git_repo_path: Option<&Path>, repo_coordinate: &Nip19Coordinate, ) -> Result { let mut maintainers = HashSet::new(); let mut new_coordinate: bool; maintainers.insert(repo_coordinate.public_key); let mut repo_events = vec![]; loop { new_coordinate = false; let repo_events_filter = get_filter_repo_ann_events( &HashSet::from_iter(maintainers.iter().map(|m| Nip19Coordinate { coordinate: Coordinate { kind: Kind::GitRepoAnnouncement, public_key: *m, identifier: repo_coordinate.identifier.to_string(), }, relays: vec![], })), true, ); let events = [ get_event_from_global_cache(git_repo_path, vec![repo_events_filter.clone()]).await?, if let Some(git_repo_path) = git_repo_path { get_events_from_local_cache(git_repo_path, vec![repo_events_filter]).await? } else { vec![] }, ] .concat(); for e in events { if let Ok(repo_ref) = RepoRef::try_from((e.clone(), None)) { for m in repo_ref.maintainers { if maintainers.insert(m) { new_coordinate = true; } } repo_events.push(e); } } if !new_coordinate { break; } } repo_events.sort_by_key(|e| e.created_at); let repo_ref = RepoRef::try_from(( repo_events .iter() .find(|e| e.pubkey == repo_coordinate.public_key) .context("no repo announcement event found at specified Nip19Coordinates. if you are the repository maintainer consider running `ngit init` to create one")? .clone(), Some(repo_coordinate.public_key), ))?; let mut events: HashMap = HashMap::new(); for m in &maintainers { if let Some(e) = repo_events.iter().find(|e| e.pubkey.eq(m)) { events.insert( Nip19Coordinate { coordinate: Coordinate { kind: e.kind, identifier: e.tags.identifier().unwrap().to_string(), public_key: e.pubkey, }, relays: vec![], }, e.clone(), ); } } // Use relays, git and blossom servers from all maintainer announcement events // we use Vec and HashSet to remove duplicates and preserve order let mut relays: Vec = repo_ref.relays.clone(); let mut git_server: Vec = repo_ref.git_server.clone(); let mut blossoms: Vec = repo_ref.blossoms.clone(); let mut seen_relays: HashSet = HashSet::from_iter(relays.iter().cloned()); let mut seen_git_server: HashSet = git_server .iter() .map(|server| server.trim_end_matches('/').to_string()) .collect(); let mut seen_blossoms: HashSet = HashSet::from_iter(blossoms.iter().cloned()); // also set maintainers_without_annoucnement let mut maintainers_without_annoucnement: Vec = vec![]; for m in &maintainers { if let Some(event) = repo_events.iter().find(|e| e.pubkey == *m) { if let Ok(m_repo_ref) = RepoRef::try_from((event.clone(), None)) { for relay in m_repo_ref.relays { if seen_relays.insert(relay.clone()) { relays.push(relay); } } for server in m_repo_ref.git_server { if seen_git_server.insert(server.trim_end_matches('/').to_string()) { git_server.push(server); } } for blossom in m_repo_ref.blossoms { if seen_blossoms.insert(blossom.clone()) { blossoms.push(blossom); } } } } else { maintainers_without_annoucnement.push(*m); } } Ok(RepoRef { // use all maintainers from all events found, not just maintainers in the most // recent event maintainers: maintainers.iter().copied().collect::>(), relays, git_server, events, maintainers_without_annoucnement: Some(maintainers_without_annoucnement), ..repo_ref }) } pub async fn get_state_from_cache( git_repo_path: Option<&Path>, repo_ref: &RepoRef, ) -> Result { if let Some(git_repo_path) = git_repo_path { RepoState::try_from( get_events_from_local_cache( git_repo_path, vec![get_filter_state_events(&repo_ref.coordinates(), true)], ) .await?, ) } else { RepoState::try_from( get_event_from_global_cache( git_repo_path, vec![get_filter_state_events(&repo_ref.coordinates(), true)], ) .await?, ) } } #[allow(clippy::too_many_lines)] async fn create_relays_request( git_repo_path: Option<&Path>, trusted_maintainer_coordinate: Option<&Nip19Coordinate>, user_profiles: &HashSet, fallback_relays: HashSet, ) -> Result { let repo_ref = if let Some(trusted_maintainer_coordinate) = trusted_maintainer_coordinate { (get_repo_ref_from_cache(git_repo_path, trusted_maintainer_coordinate).await).ok() } else { None }; let repo_coordinates = { // add Nip19Coordinates of users listed in maintainers to explicitly // specified coodinates let mut set: HashSet = HashSet::new(); if let Some(trusted_maintainer_coordinate) = trusted_maintainer_coordinate { set.insert(trusted_maintainer_coordinate.clone()); } if let Some(repo_ref) = &repo_ref { for c in repo_ref.coordinates() { if !set .iter() .any(|e| e.identifier.eq(&c.identifier) && e.public_key.eq(&c.public_key)) { set.insert(c); } } } set }; let repo_coordinates_without_relays = { let mut set = HashSet::new(); for c in &repo_coordinates { set.insert(Nip19Coordinate { coordinate: Coordinate { kind: c.kind, identifier: c.identifier.clone(), public_key: c.public_key, }, relays: vec![], }); } set }; let mut proposals: HashSet = HashSet::new(); let mut missing_contributor_profiles: HashSet = HashSet::new(); let mut contributors: HashSet = HashSet::new(); if !repo_coordinates_without_relays.is_empty() { if let Some(repo_ref) = &repo_ref { for m in &repo_ref.maintainers { contributors.insert(m.to_owned()); } } if let Some(git_repo_path) = git_repo_path { for event in &get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kinds(vec![Kind::GitPatch]) .custom_tags( SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates_without_relays .iter() .map(|c| c.coordinate.to_string()) .collect::>(), ), ], ) .await? { if event_is_patch_set_root(event) || event_is_revision_root(event) { proposals.insert(event.id); contributors.insert(event.pubkey); } } } let profile_events = get_event_from_global_cache( git_repo_path, vec![get_filter_contributor_profiles(contributors.clone())], ) .await?; for c in &contributors { if let Some(event) = profile_events .iter() .find(|e| e.kind == Kind::Metadata && e.pubkey.eq(c)) { if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, event).await?; } } else { missing_contributor_profiles.insert(c.to_owned()); } } } let profiles_to_fetch_from_user_relays = { let mut user_profiles = user_profiles.clone(); if let Some(git_repo_path) = git_repo_path { if let Ok(Some(current_user)) = get_likely_logged_in_user(git_repo_path).await { user_profiles.insert(current_user); } } let mut map: HashMap = HashMap::new(); for public_key in &user_profiles { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, public_key).await { map.insert( public_key.to_owned(), ( user_ref.metadata.created_at, user_ref.relays.created_at, user_ref.grasp_list.created_at, ), ); } else { map.insert( public_key.to_owned(), (Timestamp::from(0), Timestamp::from(0), Timestamp::from(0)), ); } } map }; let user_relays_for_profiles = { let mut set = HashSet::new(); for user in &profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { for r in user_ref.relays.write() { if let Ok(url) = RelayUrl::parse(&r) { set.insert(url); } } } else { missing_contributor_profiles.insert(user.to_owned()); } } set }; let existing_events: HashSet = { let mut existing_events: HashSet = HashSet::new(); for filter in get_fetch_filters( &repo_coordinates_without_relays, &proposals, &missing_contributor_profiles .union( &profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(), ) { if let Some(git_repo_path) = git_repo_path { for (id, _) in get_local_cache_database(git_repo_path) .await? .negentropy_items(filter) .await? { existing_events.insert(id); } } } existing_events }; let relays = { let mut relays = fallback_relays; if let Some(repo_ref) = &repo_ref { for r in repo_ref.relays.clone() { relays.insert(r); } } for c in repo_coordinates { for r in &c.relays { relays.insert(r.clone()); } } relays }; let relay_column_width = relays .union(&user_relays_for_profiles) .reduce(|a, r| { if r.to_string() .chars() .count() .gt(&a.to_string().chars().count()) { r } else { a } }) .unwrap() .to_string() .chars() .count() + 2; Ok(FetchRequest { selected_relay: None, repo_relays: relays, relay_column_width, repo_coordinates_without_relays: if let Some(repo_ref) = &repo_ref { repo_ref.coordinates_with_timestamps() } else { repo_coordinates_without_relays .iter() .map(|c| (c.clone(), None)) .collect() }, state: if let Some(repo_ref) = &repo_ref { if let Ok(existing_state) = get_state_from_cache(git_repo_path, repo_ref).await { Some((existing_state.event.created_at, existing_state.event.id)) } else { None } } else { None }, proposals, contributors, missing_contributor_profiles, existing_events, profiles_to_fetch_from_user_relays, user_relays_for_profiles, }) } #[allow(clippy::too_many_lines)] async fn process_fetched_events( events: Vec, request: &FetchRequest, git_repo_path: Option<&Path>, fresh_coordinates: &mut HashSet, fresh_proposal_roots: &mut HashSet, fresh_profiles: &mut HashSet, report: &mut FetchReport, ) -> Result<()> { for event in &events { if !request.existing_events.contains(&event.id) { if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, event).await?; } if event.kind.eq(&Kind::GitRepoAnnouncement) { save_event_in_global_cache(git_repo_path, event).await?; let new_coordinate = !request .repo_coordinates_without_relays .iter() .map(|(c, _)| c.clone()) .any(|c| { c.identifier.eq(event.tags.identifier().unwrap()) && c.public_key.eq(&event.pubkey) }); let update_to_existing = !new_coordinate && request .repo_coordinates_without_relays .iter() .any(|(c, t)| { c.identifier.eq(event.tags.identifier().unwrap()) && c.public_key.eq(&event.pubkey) && if let Some(t) = t { event.created_at.gt(t) } else { true } }); if update_to_existing { report.updated_repo_announcements.push(( Nip19Coordinate { coordinate: Coordinate { kind: event.kind, public_key: event.pubkey, identifier: event.tags.identifier().unwrap().to_owned(), }, relays: vec![], }, event.created_at, )); } // if contains new maintainer if let Ok(repo_ref) = &RepoRef::try_from((event.clone(), None)) { for m in &repo_ref.maintainers { if !request .repo_coordinates_without_relays // prexisting maintainers .iter() .map(|(c, _)| c.clone()) .collect::>() .union(&report.repo_coordinates_without_relays) // already added maintainers .any(|c| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key)) { let c = Nip19Coordinate { coordinate: Coordinate { kind: event.kind, public_key: *m, identifier: repo_ref.identifier.clone(), }, relays: vec![], }; fresh_coordinates.insert(c.clone()); report.repo_coordinates_without_relays.insert(c); if !request.contributors.contains(m) && !request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() .contains(m) && !fresh_profiles.contains(m) { fresh_profiles.insert(m.to_owned()); } } } } } else if event.kind.eq(&STATE_KIND) { let existing_state = if report.updated_state.is_some() { report.updated_state } else { request.state }; if let Some((timestamp, id)) = existing_state { if event.created_at.gt(×tamp) || (event.created_at.eq(×tamp) && event.id.gt(&id)) { report.updated_state = Some((event.created_at, event.id)); } } } else if event_is_patch_set_root(event) || event.kind.eq(&KIND_PULL_REQUEST) { fresh_proposal_roots.insert(event.id); report.proposals.insert(event.id); if !request.contributors.contains(&event.pubkey) && !fresh_profiles.contains(&event.pubkey) { fresh_profiles.insert(event.pubkey); } } else if [Kind::RelayList, Kind::Metadata, KIND_USER_GRASP_LIST].contains(&event.kind) { if request.missing_contributor_profiles.contains(&event.pubkey) { report.contributor_profiles.insert(event.pubkey); } else if let Some(( _, (metadata_timestamp, relay_list_timestamp, grasp_list_timestamp), )) = request .profiles_to_fetch_from_user_relays .get_key_value(&event.pubkey) { if (Kind::Metadata.eq(&event.kind) && event.created_at.gt(metadata_timestamp)) || (Kind::RelayList.eq(&event.kind) && event.created_at.gt(relay_list_timestamp)) || (KIND_USER_GRASP_LIST.eq(&event.kind) && event.created_at.gt(grasp_list_timestamp)) { report.profile_updates.insert(event.pubkey); } } save_event_in_global_cache(git_repo_path, event).await?; } } } for event in &events { if !request.existing_events.contains(&event.id) && !event.tags.iter().any(|t| { t.as_slice().len() > 1 && (t.as_slice()[0].eq("E") || t.as_slice()[0].eq("e")) && if let Ok(id) = EventId::parse(&t.as_slice()[1]) { report.proposals.contains(&id) } else { false } }) { if (event.kind.eq(&Kind::GitPatch) && !event_is_patch_set_root(event)) || event.kind.eq(&KIND_PULL_REQUEST_UPDATE) { report.commits.insert(event.id); } else if status_kinds().contains(&event.kind) { report.statuses.insert(event.id); } } } Ok(()) } pub fn consolidate_fetch_reports(reports: Vec>) -> FetchReport { let mut report = FetchReport::default(); for relay_report in reports.into_iter().flatten() { for c in relay_report.repo_coordinates_without_relays { if !report .repo_coordinates_without_relays .iter() .any(|e| e.eq(&c)) { report.repo_coordinates_without_relays.insert(c); } } for (r, t) in relay_report.updated_repo_announcements { if let Some(i) = report .updated_repo_announcements .iter() .position(|(e, _)| e.eq(&r)) { let (_, existing_t) = &report.updated_repo_announcements[i]; if t.gt(existing_t) { report.updated_repo_announcements[i] = (r, t); } } else { report.updated_repo_announcements.push((r, t)); } } if let Some((timestamp, id)) = relay_report.updated_state { if let Some((existing_timestamp, existing_id)) = report.updated_state { if timestamp.gt(&existing_timestamp) || (timestamp.eq(&existing_timestamp) && id.gt(&existing_id)) { report.updated_state = Some((timestamp, id)); } } else { report.updated_state = Some((timestamp, id)); } } for c in relay_report.proposals { report.proposals.insert(c); } for c in relay_report.commits { report.commits.insert(c); } for c in relay_report.statuses { report.statuses.insert(c); } for c in relay_report.contributor_profiles { report.contributor_profiles.insert(c); } for c in relay_report.profile_updates { report.profile_updates.insert(c); } } report } pub fn get_fetch_filters( repo_coordinates: &HashSet, proposal_ids: &HashSet, required_profiles: &HashSet, ) -> Vec { [ if repo_coordinates.is_empty() { vec![] } else { vec![ get_filter_state_events(repo_coordinates, false), get_filter_repo_ann_events(repo_coordinates, false), nostr::Filter::default() .kinds(vec![Kind::GitPatch, Kind::EventDeletion, KIND_PULL_REQUEST]) .custom_tags( SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates .iter() .map(|c| c.coordinate.to_string()) .collect::>(), ), ] }, if proposal_ids.is_empty() { vec![] } else { vec![ nostr::Filter::default().events(proposal_ids.clone()).kinds( [ vec![ Kind::GitPatch, Kind::EventDeletion, KIND_PULL_REQUEST_UPDATE, ], status_kinds(), ] .concat(), ), nostr::Filter::default() .custom_tags( SingleLetterTag::uppercase(Alphabet::E), proposal_ids.clone(), ) .kinds( [ vec![Kind::EventDeletion, KIND_PULL_REQUEST_UPDATE], status_kinds(), ] .concat(), ), ] }, if required_profiles.is_empty() { vec![] } else { vec![get_filter_contributor_profiles(required_profiles.clone())] }, ] .concat() } pub fn get_filter_repo_ann_events( repo_coordinates: &HashSet, maintainers_only: bool, ) -> nostr::Filter { let filter = nostr::Filter::default() .kind(Kind::GitRepoAnnouncement) .identifiers( repo_coordinates .iter() .map(|c| c.identifier.clone()) .collect::>(), ); if maintainers_only { filter.authors( repo_coordinates .iter() .map(|c| c.coordinate.public_key) .collect::>(), ) } else { filter } } pub static STATE_KIND: nostr::Kind = Kind::Custom(30618); pub fn get_filter_state_events( repo_coordinates: &HashSet, maintainers_only: bool, ) -> nostr::Filter { let filter = nostr::Filter::default().kind(STATE_KIND).identifiers( repo_coordinates .iter() .map(|c| c.identifier.clone()) .collect::>(), ); if maintainers_only { filter.authors( repo_coordinates .iter() .map(|c| c.coordinate.public_key) .collect::>(), ) } else { filter } } pub fn get_filter_contributor_profiles(contributors: HashSet) -> nostr::Filter { nostr::Filter::default() .kinds(vec![Kind::Metadata, Kind::RelayList, KIND_USER_GRASP_LIST]) .authors(contributors) } #[derive(Default)] pub struct FetchReport { repo_coordinates_without_relays: HashSet, updated_repo_announcements: Vec<(Nip19Coordinate, Timestamp)>, updated_state: Option<(Timestamp, EventId)>, proposals: HashSet, /// commits against existing propoals commits: HashSet, statuses: HashSet, contributor_profiles: HashSet, profile_updates: HashSet, } impl Display for FetchReport { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses" let mut display_items: Vec = vec![]; if !self.repo_coordinates_without_relays.is_empty() { display_items.push(format!( "{} new maintainer{}", self.repo_coordinates_without_relays.len(), if self.repo_coordinates_without_relays.len() > 1 { "s" } else { "" }, )); } if !self.updated_repo_announcements.is_empty() { display_items.push(format!( "{} announcement update{}", self.updated_repo_announcements.len(), if self.updated_repo_announcements.len() > 1 { "s" } else { "" }, )); } if self.updated_state.is_some() { display_items.push("new state".to_string()); } if !self.proposals.is_empty() { display_items.push(format!( "{} proposal{}", self.proposals.len(), if self.proposals.len() > 1 { "s" } else { "" }, )); } if !self.commits.is_empty() { display_items.push(format!( "{} commit{}", self.commits.len(), if self.commits.len() > 1 { "s" } else { "" }, )); } if !self.statuses.is_empty() { display_items.push(format!( "{} status{}", self.statuses.len(), if self.statuses.len() > 1 { "es" } else { "" }, )); } if !self.contributor_profiles.is_empty() { display_items.push(format!( "{} user profile{}", self.contributor_profiles.len(), if self.contributor_profiles.len() > 1 { "s" } else { "" }, )); } if !self.profile_updates.is_empty() { display_items.push(format!( "{} profile update{}", self.profile_updates.len(), if self.profile_updates.len() > 1 { "s" } else { "" }, )); } write!(f, "{}", display_items.join(", ")) } } #[derive(Default, Clone)] pub struct FetchRequest { repo_relays: HashSet, selected_relay: Option, relay_column_width: usize, repo_coordinates_without_relays: Vec<(Nip19Coordinate, Option)>, state: Option<(Timestamp, EventId)>, proposals: HashSet, contributors: HashSet, missing_contributor_profiles: HashSet, existing_events: HashSet, profiles_to_fetch_from_user_relays: HashMap, user_relays_for_profiles: HashSet, } pub async fn fetching_with_report( git_repo_path: &Path, #[cfg(test)] client: &crate::client::MockConnect, #[cfg(not(test))] client: &Client, trusted_maintainer_coordinate: &Nip19Coordinate, ) -> Result { let term = console::Term::stderr(); term.write_line("fetching updates...")?; let (relay_reports, progress_reporter) = client .fetch_all( Some(git_repo_path), Some(trusted_maintainer_coordinate), &HashSet::new(), ) .await?; if !relay_reports.iter().any(std::result::Result::is_err) { let _ = progress_reporter.clear(); } let report = consolidate_fetch_reports(relay_reports); if report.to_string().is_empty() { println!("no updates"); } else { println!("updates: {report}"); } Ok(report) } pub async fn get_proposals_and_revisions_from_cache( git_repo_path: &Path, repo_coordinates: HashSet, ) -> Result> { let mut proposals = get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kinds([nostr::Kind::GitPatch, KIND_PULL_REQUEST]) .custom_tags( nostr::SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates .iter() .map(|c| c.coordinate.to_string()) .collect::>(), ), ], ) .await? .iter() .filter(|e| event_is_patch_set_root(e) || e.kind.eq(&KIND_PULL_REQUEST)) .filter(|e| e.kind.eq(&Kind::GitPatch) || event_is_valid_pr_or_pr_update(e)) .cloned() .collect::>(); proposals.sort_by_key(|e| e.created_at); proposals.reverse(); Ok(proposals) } pub async fn get_all_proposal_patch_pr_pr_update_events_from_cache( git_repo_path: &Path, repo_ref: &RepoRef, proposal_id: &nostr::EventId, ) -> Result> { let mut commit_events = get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kinds([ nostr::Kind::GitPatch, KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, ]) .event(*proposal_id), nostr::Filter::default() .kinds([ nostr::Kind::GitPatch, KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, ]) .custom_tag(SingleLetterTag::uppercase(Alphabet::E), *proposal_id), nostr::Filter::default() .kinds([nostr::Kind::GitPatch, KIND_PULL_REQUEST]) .id(*proposal_id), ], ) .await?; let permissioned_users: HashSet = [ repo_ref.maintainers.clone(), vec![ commit_events .iter() .find(|e| e.id.eq(proposal_id)) .context("proposal not in cache")? .pubkey, ], ] .concat() .iter() .copied() .collect(); commit_events.retain(|e| { permissioned_users.contains(&e.pubkey) && (e.kind.eq(&Kind::GitPatch) || event_is_valid_pr_or_pr_update(e)) }); let revision_roots: HashSet = commit_events .iter() .filter(|e| event_is_revision_root(e)) .map(|e| e.id) .collect(); if !revision_roots.is_empty() { for event in get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kinds([ nostr::Kind::GitPatch, KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, ]) .events(revision_roots.clone()) .authors(permissioned_users.clone()), nostr::Filter::default() .kinds([ nostr::Kind::GitPatch, KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, ]) .custom_tags(SingleLetterTag::uppercase(Alphabet::E), revision_roots) .authors(permissioned_users.clone()), ], ) .await? { commit_events.push(event); } } Ok(commit_events .iter() .filter(|e| !event_is_cover_letter(e) && permissioned_users.contains(&e.pubkey)) .cloned() .collect()) } pub async fn get_event_from_cache_by_id(git_repo: &Repo, event_id: &EventId) -> Result { Ok(get_events_from_local_cache( git_repo.get_path()?, vec![nostr::Filter::default().id(*event_id)], ) .await? .first() .context("failed to find event in cache")? .clone()) } #[allow(clippy::module_name_repetitions)] #[allow(clippy::too_many_lines)] pub async fn send_events( #[cfg(test)] client: &crate::client::MockConnect, #[cfg(not(test))] client: &Client, git_repo_path: Option<&Path>, events: Vec, my_write_relays: Vec, repo_read_relays: Vec, animate: bool, silent: bool, ) -> Result<()> { let fallback = [ client.get_relay_default_set().clone(), if events.iter().any(|e| e.kind.eq(&Kind::GitRepoAnnouncement)) { client.get_blaster_relays().clone() } else { vec![] }, ] .concat(); let mut relays: Vec<&str> = vec![]; let repo_read_relays = repo_read_relays .iter() .map(|r| r.to_string()) .collect::>(); let all = &[ repo_read_relays.clone(), my_write_relays.clone(), fallback.clone(), ] .concat(); // add duplicates first for r in &repo_read_relays { let r_clean = remove_trailing_slash(r); if !my_write_relays .iter() .filter(|x| r_clean.eq(&remove_trailing_slash(x))) .count() > 1 && !relays.iter().any(|x| r_clean.eq(&remove_trailing_slash(x))) { relays.push(r); } } for r in all { let r_clean = remove_trailing_slash(r); if !relays.iter().any(|x| r_clean.eq(&remove_trailing_slash(x))) { relays.push(r); } } let m = if silent { MultiProgress::with_draw_target(ProgressDrawTarget::hidden()) } else { MultiProgress::new() }; let pb_style = ProgressStyle::with_template(if animate { " {spinner} {prefix} {bar} {pos}/{len} {msg}" } else { " - {prefix} {bar} {pos}/{len} {msg}" })? .progress_chars("##-"); let pb_after_style = |symbol| ProgressStyle::with_template(format!(" {symbol} {}", "{prefix} {msg}",).as_str()); let pb_after_style_succeeded = pb_after_style(if animate { console::style("✔".to_string()) .for_stderr() .green() .to_string() } else { "y".to_string() })?; let pb_after_style_failed = pb_after_style(if animate { console::style("✘".to_string()) .for_stderr() .red() .to_string() } else { "x".to_string() })?; #[allow(clippy::borrow_deref_ref)] join_all(relays.iter().map(|&relay| async { let relay_clean = remove_trailing_slash(relay); let details = format!( "{}{}{} {}", if my_write_relays .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(r))) { " [my-relay]" } else { "" }, if repo_read_relays .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(&r.to_string()))) { " [repo-relay]" } else { "" }, if fallback .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(r))) { " [default]" } else { "" }, relay_clean, ); let pb = m.add( ProgressBar::new(events.len() as u64) .with_prefix(details.to_string()) .with_style(pb_style.clone()), ); if animate { pb.enable_steady_tick(Duration::from_millis(300)); } pb.inc(0); // need to make pb display intially let mut failed = false; for event in &events { match client .send_event_to(git_repo_path, relay, event.clone()) .await { Ok(_) => pb.inc(1), Err(e) => { pb.set_style(pb_after_style_failed.clone()); pb.finish_with_message( console::style(format!( "error: {}", e.to_string() .replace("relay pool error:", "") .replace("event not published: ", "") )) .for_stderr() .red() .to_string(), ); failed = true; break; } }; } if !failed { pb.set_style(pb_after_style_succeeded.clone()); pb.finish_with_message(""); } })) .await; Ok(()) } fn remove_trailing_slash(s: &str) -> String { match s.strip_suffix('/') { Some(s) => s, None => s, } .to_string() }