// have you considered // TO USE ASYNC // in traits (required for mocking unit tests) // https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html // https://github.com/dtolnay/async-trait // see https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html // I think we can use the async-trait crate and switch to the native feature // which is currently in nightly. alternatively we can use nightly as it looks // certain that the implementation is going to make it to stable but we don't // want to inadvertlty use other features of nightly that might be removed. use std::{ collections::{HashMap, HashSet}, fmt::{Display, Write}, fs::create_dir_all, path::Path, sync::Arc, time::Duration, }; use anyhow::{bail, Context, Result}; use async_trait::async_trait; use console::Style; use futures::{ future::join_all, stream::{self, StreamExt}, }; use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle}; #[cfg(test)] use mockall::*; use nostr::{nips::nip01::Coordinate, Event}; use nostr_database::NostrDatabase; use nostr_lmdb::NostrLMDB; use nostr_sdk::{ prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey, SingleLetterTag, Timestamp, Url, }; use crate::{ get_dirs, git::{Repo, RepoActions}, git_events::{ event_is_cover_letter, event_is_patch_set_root, event_is_revision_root, status_kinds, }, login::{get_likely_logged_in_user, user::get_user_ref_from_cache}, repo_ref::RepoRef, repo_state::RepoState, }; #[allow(clippy::struct_field_names)] pub struct Client { client: nostr_sdk::Client, fallback_relays: Vec, more_fallback_relays: Vec, blaster_relays: Vec, fallback_signer_relays: Vec, } #[cfg_attr(test, automock)] #[async_trait] pub trait Connect { fn default() -> Self; fn new(opts: Params) -> Self; async fn set_signer(&mut self, signer: Arc); async fn connect(&self, relay_url: &Url) -> Result<()>; async fn disconnect(&self) -> Result<()>; fn get_fallback_relays(&self) -> &Vec; fn get_more_fallback_relays(&self) -> &Vec; fn get_blaster_relays(&self) -> &Vec; fn get_fallback_signer_relays(&self) -> &Vec; async fn send_event_to<'a>( &self, git_repo_path: Option<&'a Path>, url: &str, event: nostr::event::Event, ) -> Result; async fn get_events( &self, relays: Vec, filters: Vec, ) -> Result>; async fn get_events_per_relay( &self, relays: Vec, filters: Vec, progress_reporter: MultiProgress, ) -> Result<(Vec>>, MultiProgress)>; async fn fetch_all<'a>( &self, git_repo_path: Option<&'a Path>, repo_coordinates: &HashSet, user_profiles: &HashSet, ) -> Result<(Vec>, MultiProgress)>; async fn fetch_all_from_relay<'a>( &self, git_repo_path: Option<&'a Path>, request: FetchRequest, pb: &Option, ) -> Result; } #[async_trait] impl Connect for Client { fn default() -> Self { let fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { vec![ "ws://localhost:8051".to_string(), "ws://localhost:8052".to_string(), ] } else { vec![ "wss://relay.damus.io".to_string(), /* free, good reliability, have been known * to delete all messages */ "wss://nos.lol".to_string(), "wss://relay.nostr.band".to_string(), ] }; let more_fallback_relays: Vec = if std::env::var("NGITTEST").is_ok() { vec![ "ws://localhost:8055".to_string(), "ws://localhost:8056".to_string(), ] } else { vec![ "wss://purplerelay.com".to_string(), // free but reliability not tested "wss://purplepages.es".to_string(), // for profile events but unreliable "wss://relayable.org".to_string(), // free but not always reliable ] }; let blaster_relays: Vec = if std::env::var("NGITTEST").is_ok() { vec!["ws://localhost:8057".to_string()] } else { vec![] }; let fallback_signer_relays: Vec = if std::env::var("NGITTEST").is_ok() { vec!["ws://localhost:8051".to_string()] } else { vec!["wss://relay.nsec.app".to_string()] }; Client { client: nostr_sdk::ClientBuilder::new() .opts(Options::new().relay_limits(RelayLimits::disable())) .build(), fallback_relays, more_fallback_relays, blaster_relays, fallback_signer_relays, } } fn new(opts: Params) -> Self { Client { client: nostr_sdk::ClientBuilder::new() .opts(Options::new().relay_limits(RelayLimits::disable())) .signer(opts.keys.unwrap_or(nostr::Keys::generate())) // .database( // SQLiteDatabase::open(get_dirs()?.cache_dir().join("nostr-cache.lmdb")). // await?, ) .build(), fallback_relays: opts.fallback_relays, more_fallback_relays: opts.more_fallback_relays, blaster_relays: opts.blaster_relays, fallback_signer_relays: opts.fallback_signer_relays, } } async fn set_signer(&mut self, signer: Arc) { self.client.set_signer(signer).await; } async fn connect(&self, relay_url: &Url) -> Result<()> { self.client .add_relay(relay_url) .await .context("failed to add relay")?; let relay = self.client.relay(relay_url).await?; if !relay.is_connected() { #[allow(clippy::large_futures)] relay .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) .await; } if !relay.is_connected() { bail!("connection timeout"); } Ok(()) } async fn disconnect(&self) -> Result<()> { self.client.disconnect().await?; Ok(()) } fn get_fallback_relays(&self) -> &Vec { &self.fallback_relays } fn get_more_fallback_relays(&self) -> &Vec { &self.more_fallback_relays } fn get_blaster_relays(&self) -> &Vec { &self.blaster_relays } fn get_fallback_signer_relays(&self) -> &Vec { &self.fallback_signer_relays } async fn send_event_to<'a>( &self, git_repo_path: Option<&'a Path>, url: &str, event: Event, ) -> Result { self.client.add_relay(url).await?; #[allow(clippy::large_futures)] self.client.connect_relay(url).await?; self.client .relay(url) .await? .send_event(event.clone()) .await?; if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, &event).await?; } if event.kind.eq(&Kind::GitRepoAnnouncement) { save_event_in_global_cache(git_repo_path, &event).await?; } Ok(event.id) } async fn get_events( &self, relays: Vec, filters: Vec, ) -> Result> { let (relay_results, _) = self .get_events_per_relay( relays.iter().map(|r| Url::parse(r).unwrap()).collect(), filters, MultiProgress::new(), ) .await?; Ok(get_dedup_events(relay_results)) } async fn get_events_per_relay( &self, relays: Vec, filters: Vec, progress_reporter: MultiProgress, ) -> Result<(Vec>>, MultiProgress)> { // add relays for relay in &relays { self.client .add_relay(relay.as_str()) .await .context("failed to add relay")?; } let relays_map = self.client.relays().await; let futures: Vec<_> = relays .clone() .iter() // don't look for events on blaster .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) .map(|r| (relays_map.get(r).unwrap(), filters.clone())) .map(|(relay, filters)| async { let pb = if std::env::var("NGITTEST").is_err() { let pb = progress_reporter.add( ProgressBar::new(1) .with_prefix(format!("{: <11}{}", "connecting", relay.url())) .with_style(pb_style()?), ); pb.enable_steady_tick(Duration::from_millis(300)); Some(pb) } else { None }; #[allow(clippy::large_futures)] match get_events_of(relay, filters, &pb).await { Err(error) => { if let Some(pb) = pb { pb.set_style(pb_after_style(false)); pb.set_prefix(format!("{: <11}{}", "error", relay.url())); pb.finish_with_message( console::style( error.to_string().replace("relay pool error:", "error:"), ) .for_stderr() .red() .to_string(), ); } Err(error) } Ok(res) => { if let Some(pb) = pb { pb.set_style(pb_after_style(true)); pb.set_prefix(format!( "{: <11}{}", format!("{} events", res.len()), relay.url() )); pb.finish_with_message(""); } Ok(res) } } }) .collect(); let relay_results: Vec>> = stream::iter(futures).buffer_unordered(15).collect().await; Ok((relay_results, progress_reporter)) } #[allow(clippy::too_many_lines)] async fn fetch_all<'a>( &self, git_repo_path: Option<&'a Path>, repo_coordinates: &HashSet, user_profiles: &HashSet, ) -> Result<(Vec>, MultiProgress)> { let fallback_relays = &self .fallback_relays .iter() .filter_map(|r| Url::parse(r).ok()) .collect::>(); let mut request = create_relays_request( git_repo_path, repo_coordinates, user_profiles, fallback_relays.clone(), ) .await?; let progress_reporter = MultiProgress::new(); let mut processed_relays = HashSet::new(); let mut relay_reports: Vec> = vec![]; loop { let relays = request .repo_relays .union(&request.user_relays_for_profiles) // don't look for events on blaster .filter(|&r| !r.as_str().contains("nostr.mutinywallet.com")) .cloned() .collect::>() .difference(&processed_relays) .cloned() .collect::>(); if relays.is_empty() { break; } let profile_relays_only = request .user_relays_for_profiles .difference(&request.repo_relays) .collect::>(); for relay in &request.repo_relays { self.client .add_relay(relay.as_str()) .await .context("failed to add relay")?; } let dim = Style::new().color256(247); let futures: Vec<_> = relays .iter() .map(|r| { if profile_relays_only.contains(r) { // if relay isn't a repo relay, just filter for user profile FetchRequest { selected_relay: Some(r.to_owned()), repo_coordinates_without_relays: vec![], proposals: HashSet::new(), missing_contributor_profiles: request .missing_contributor_profiles .union( &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(), ..request.clone() } } else { FetchRequest { selected_relay: Some(r.to_owned()), ..request.clone() } } }) .map(|request| async { let relay_column_width = request.relay_column_width; let relay_url = request .selected_relay .clone() .context("fetch_all_from_relay called without a relay")?; let pb = if std::env::var("NGITTEST").is_err() { let pb = progress_reporter.add( ProgressBar::new(1) .with_prefix( dim.apply_to(format!( "{: { if let Some(pb) = pb { pb.set_style(pb_after_style(false)); pb.set_prefix( dim.apply_to(format!("{: Ok(res), } }) .collect(); for report in stream::iter(futures) .buffer_unordered(15) .collect::>>() .await { relay_reports.push(report); } processed_relays.extend(relays.clone()); if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await { request.repo_relays = repo_ref .relays .iter() .filter_map(|r| Url::parse(r).ok()) .collect(); } request.user_relays_for_profiles = { let mut set = HashSet::new(); for user in &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { for r in user_ref.relays.write() { if let Ok(url) = Url::parse(&r) { set.insert(url); } } } } set }; } Ok((relay_reports, progress_reporter)) } async fn fetch_all_from_relay<'a>( &self, git_repo_path: Option<&'a Path>, request: FetchRequest, pb: &Option, ) -> Result { let mut fresh_coordinates: HashSet = HashSet::new(); for (c, _) in request.repo_coordinates_without_relays.clone() { fresh_coordinates.insert(c); } let mut fresh_proposal_roots = request.proposals.clone(); let mut fresh_profiles: HashSet = request .missing_contributor_profiles .union( &request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(); let mut report = FetchReport::default(); let relay_url = request .selected_relay .clone() .context("fetch_all_from_relay called without a relay")?; let relay_column_width = request.relay_column_width; self.connect(&relay_url).await?; let dim = Style::new().color256(247); loop { let filters = get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_profiles); if let Some(pb) = &pb { pb.set_prefix( dim.apply_to(format!( "{: = get_events_of(&relay, filters.clone(), &None) .await? .iter() // don't process events that don't match filters .filter(|e| filters.iter().any(|f| f.match_event(e))) .cloned() .collect(); // TODO: try reconcile process_fetched_events( events, &request, git_repo_path, &mut fresh_coordinates, &mut fresh_proposal_roots, &mut fresh_profiles, &mut report, ) .await?; if fresh_coordinates.is_empty() && fresh_proposal_roots.is_empty() && fresh_profiles.is_empty() { break; } } if let Some(pb) = pb { pb.set_style(pb_after_style(true)); pb.set_prefix( dim.apply_to(format!( "{: , pb: &Option, ) -> Result> { // relay.reconcile(filter, opts).await?; if !relay.is_connected() { #[allow(clippy::large_futures)] relay .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) .await; } if !relay.is_connected() { bail!("connection timeout"); } else if let Some(pb) = pb { pb.set_prefix(format!("connected {}", relay.url())); } let events = relay .fetch_events( filters, // 20 is nostr_sdk default std::time::Duration::from_secs(GET_EVENTS_TIMEOUT), nostr_sdk::FilterOptions::ExitOnEOSE, ) .await? .to_vec(); Ok(events) } #[derive(Default)] pub struct Params { pub keys: Option, pub fallback_relays: Vec, pub more_fallback_relays: Vec, pub blaster_relays: Vec, pub fallback_signer_relays: Vec, } fn get_dedup_events(relay_results: Vec>>) -> Vec { let mut dedup_events: Vec = vec![]; for events in relay_results.into_iter().flatten() { for event in events { if !dedup_events.iter().any(|e| event.id.eq(&e.id)) { dedup_events.push(event); } } } dedup_events } pub async fn sign_event( event_builder: EventBuilder, signer: &Arc, ) -> Result { // TODO: Yuki suggested he would add a backend option to NostrSigner so we can // identify nip46 signers again and replace the below if statement with: // if signer.backend() == nip46 { if std::env::var("NGITTEST").is_err() { let term = console::Term::stderr(); term.write_line("signing event with remote signer...")?; let event = signer .sign_event(event_builder.build(signer.get_public_key().await?)) .await .context("failed to sign event")?; term.clear_last_lines(1)?; Ok(event) } else { signer .sign_event(event_builder.build(signer.get_public_key().await?)) .await .context("failed to sign event") } } pub async fn fetch_public_key(signer: &Arc) -> Result { let term = console::Term::stderr(); term.write_line("fetching npub from remote signer...")?; let public_key = signer .get_public_key() .await .context("failed to get npub from remote signer")?; term.clear_last_lines(1)?; Ok(public_key) } fn pb_style() -> Result { Ok( ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( "timeout_in", |state: &ProgressState, w: &mut dyn Write| { if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { let dim = Style::new().color256(247); write!( w, "{}", dim.apply_to(format!( "timeout in {:.1}s", GET_EVENTS_TIMEOUT - state.elapsed().as_secs() )) ) .unwrap(); } }, ), ) } fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle { ProgressStyle::with_template( format!( " {} {}", if succeed { console::style("✔".to_string()) .for_stderr() .green() .to_string() } else { console::style("✘".to_string()) .for_stderr() .red() .to_string() }, "{prefix} {msg}", ) .as_str(), ) .unwrap() } async fn get_local_cache_database(git_repo_path: &Path) -> Result { NostrLMDB::open(git_repo_path.join(".git/nostr-cache.lmdb")) .context("failed to open or create nostr cache database at .git/nostr-cache.lmdb") } async fn get_global_cache_database(git_repo_path: Option<&Path>) -> Result { let path = if std::env::var("NGITTEST").is_ok() { if let Some(git_repo_path) = git_repo_path { git_repo_path.join(".git/test-global-cache.lmdb") } else { bail!("git_repo must be supplied to get_global_cache_database during integration tests") } } else { create_dir_all(get_dirs()?.cache_dir()).context(format!( "failed to create cache directory in: {:?}", get_dirs()?.cache_dir() ))?; get_dirs()?.cache_dir().join("nostr-cache.lmdb") }; NostrLMDB::open(path).context("failed to open ngit global nostr cache database") } pub async fn get_events_from_local_cache( git_repo_path: &Path, filters: Vec, ) -> Result> { Ok(get_local_cache_database(git_repo_path) .await? .query(filters.clone()) .await .context( "failed to execute query on opened git repo nostr cache database .git/nostr-cache.lmdb", )? .to_vec()) } pub async fn get_event_from_global_cache( git_repo_path: Option<&Path>, filters: Vec, ) -> Result> { Ok(get_global_cache_database(git_repo_path) .await? .query(filters.clone()) .await .context("failed to execute query on opened ngit nostr cache database")? .to_vec()) } pub async fn save_event_in_local_cache(git_repo_path: &Path, event: &nostr::Event) -> Result { get_local_cache_database(git_repo_path) .await? .save_event(event) .await .context("failed to save event in local cache") } pub async fn save_event_in_global_cache( git_repo_path: Option<&Path>, event: &nostr::Event, ) -> Result { get_global_cache_database(git_repo_path) .await? .save_event(event) .await .context("failed to save event in local cache") } pub async fn get_repo_ref_from_cache( git_repo_path: Option<&Path>, repo_coordinates: &HashSet, ) -> Result { let mut maintainers = HashSet::new(); let mut new_coordinate: bool; for c in repo_coordinates { maintainers.insert(c.public_key); } let mut repo_events = vec![]; loop { new_coordinate = false; let repo_events_filter = get_filter_repo_events(repo_coordinates); let events = [ get_event_from_global_cache(git_repo_path, vec![repo_events_filter.clone()]).await?, if let Some(git_repo_path) = git_repo_path { get_events_from_local_cache(git_repo_path, vec![repo_events_filter]).await? } else { vec![] }, ] .concat(); for e in events { if let Ok(repo_ref) = RepoRef::try_from(e.clone()) { for m in repo_ref.maintainers { if maintainers.insert(m) { new_coordinate = true; } } repo_events.push(e); } } if !new_coordinate { break; } } repo_events.sort_by_key(|e| e.created_at); let repo_ref = RepoRef::try_from( repo_events .first() .context("no repo events at specified coordinates")? .clone(), )?; let mut events: HashMap = HashMap::new(); for m in &maintainers { if let Some(e) = repo_events.iter().find(|e| e.pubkey.eq(m)) { events.insert( Coordinate { kind: e.kind, identifier: e.tags.identifier().unwrap().to_string(), public_key: e.pubkey, relays: vec![], }, e.clone(), ); } } Ok(RepoRef { // use all maintainers from all events found, not just maintainers in the most // recent event maintainers: maintainers.iter().copied().collect::>(), events, ..repo_ref }) } pub async fn get_state_from_cache( git_repo_path: Option<&Path>, repo_ref: &RepoRef, ) -> Result { if let Some(git_repo_path) = git_repo_path { RepoState::try_from( get_events_from_local_cache( git_repo_path, vec![get_filter_state_events(&repo_ref.coordinates())], ) .await?, ) } else { RepoState::try_from( get_event_from_global_cache( git_repo_path, vec![get_filter_state_events(&repo_ref.coordinates())], ) .await?, ) } } #[allow(clippy::too_many_lines)] async fn create_relays_request( git_repo_path: Option<&Path>, repo_coordinates: &HashSet, user_profiles: &HashSet, fallback_relays: HashSet, ) -> Result { let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; let repo_coordinates = { // add coordinates of users listed in maintainers to explicitly specified // coodinates let mut repo_coordinates = repo_coordinates.clone(); if let Ok(repo_ref) = &repo_ref { for c in repo_ref.coordinates() { if !repo_coordinates .iter() .any(|e| e.identifier.eq(&c.identifier) && e.public_key.eq(&c.public_key)) { repo_coordinates.insert(c); } } } repo_coordinates }; let repo_coordinates_without_relays = { let mut set = HashSet::new(); for c in &repo_coordinates { set.insert(Coordinate { kind: c.kind, identifier: c.identifier.clone(), public_key: c.public_key, relays: vec![], }); } set }; let mut proposals: HashSet = HashSet::new(); let mut missing_contributor_profiles: HashSet = HashSet::new(); let mut contributors: HashSet = HashSet::new(); if !repo_coordinates_without_relays.is_empty() { if let Ok(repo_ref) = &repo_ref { for m in &repo_ref.maintainers { contributors.insert(m.to_owned()); } } if let Some(git_repo_path) = git_repo_path { for event in &get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kinds(vec![Kind::GitPatch]) .custom_tag( SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates_without_relays .iter() .map(std::string::ToString::to_string) .collect::>(), ), ], ) .await? { if event_is_patch_set_root(event) || event_is_revision_root(event) { proposals.insert(event.id); contributors.insert(event.pubkey); } } } let profile_events = get_event_from_global_cache( git_repo_path, vec![get_filter_contributor_profiles(contributors.clone())], ) .await?; for c in &contributors { if let Some(event) = profile_events .iter() .find(|e| e.kind == Kind::Metadata && e.pubkey.eq(c)) { if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, event).await?; } } else { missing_contributor_profiles.insert(c.to_owned()); } } } let profiles_to_fetch_from_user_relays = { let mut user_profiles = user_profiles.clone(); if let Some(git_repo_path) = git_repo_path { if let Ok(Some(current_user)) = get_likely_logged_in_user(git_repo_path).await { user_profiles.insert(current_user); } } let mut map: HashMap = HashMap::new(); for public_key in &user_profiles { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, public_key).await { map.insert( public_key.to_owned(), (user_ref.metadata.created_at, user_ref.relays.created_at), ); } else { map.insert( public_key.to_owned(), (Timestamp::from(0), Timestamp::from(0)), ); } } map }; let user_relays_for_profiles = { let mut set = HashSet::new(); for user in &profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() { if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await { for r in user_ref.relays.write() { if let Ok(url) = Url::parse(&r) { set.insert(url); } } } else { missing_contributor_profiles.insert(user.to_owned()); } } set }; let existing_events: HashSet = { let mut existing_events: HashSet = HashSet::new(); for filter in get_fetch_filters( &repo_coordinates_without_relays, &proposals, &missing_contributor_profiles .union( &profiles_to_fetch_from_user_relays .clone() .into_keys() .collect(), ) .copied() .collect(), ) { if let Some(git_repo_path) = git_repo_path { for (id, _) in get_local_cache_database(git_repo_path) .await? .negentropy_items(filter) .await? { existing_events.insert(id); } } } existing_events }; let relays = { let mut relays = fallback_relays; if let Ok(repo_ref) = &repo_ref { for r in &repo_ref.relays { if let Ok(url) = Url::parse(r) { relays.insert(url); } } } for c in repo_coordinates { for r in &c.relays { if let Ok(url) = Url::parse(r) { relays.insert(url); } } } relays }; let relay_column_width = relays .union(&user_relays_for_profiles) .reduce(|a, r| { if r.to_string() .chars() .count() .gt(&a.to_string().chars().count()) { r } else { a } }) .unwrap() .to_string() .chars() .count() + 2; Ok(FetchRequest { selected_relay: None, repo_relays: relays, relay_column_width, repo_coordinates_without_relays: if let Ok(repo_ref) = &repo_ref { repo_ref.coordinates_with_timestamps() } else { repo_coordinates_without_relays .iter() .map(|c| (c.clone(), None)) .collect() }, state: if let Ok(repo_ref) = &repo_ref { if let Ok(existing_state) = get_state_from_cache(git_repo_path, repo_ref).await { Some((existing_state.event.created_at, existing_state.event.id)) } else { None } } else { None }, proposals, contributors, missing_contributor_profiles, existing_events, profiles_to_fetch_from_user_relays, user_relays_for_profiles, }) } #[allow(clippy::too_many_lines)] async fn process_fetched_events( events: Vec, request: &FetchRequest, git_repo_path: Option<&Path>, fresh_coordinates: &mut HashSet, fresh_proposal_roots: &mut HashSet, fresh_profiles: &mut HashSet, report: &mut FetchReport, ) -> Result<()> { for event in &events { if !request.existing_events.contains(&event.id) { if let Some(git_repo_path) = git_repo_path { save_event_in_local_cache(git_repo_path, event).await?; } if event.kind.eq(&Kind::GitRepoAnnouncement) { save_event_in_global_cache(git_repo_path, event).await?; let new_coordinate = !request .repo_coordinates_without_relays .iter() .map(|(c, _)| c.clone()) .any(|c| { c.identifier.eq(event.tags.identifier().unwrap()) && c.public_key.eq(&event.pubkey) }); let update_to_existing = !new_coordinate && request .repo_coordinates_without_relays .iter() .any(|(c, t)| { c.identifier.eq(event.tags.identifier().unwrap()) && c.public_key.eq(&event.pubkey) && if let Some(t) = t { event.created_at.gt(t) } else { true } }); if update_to_existing { report.updated_repo_announcements.push(( Coordinate { kind: event.kind, public_key: event.pubkey, identifier: event.tags.identifier().unwrap().to_owned(), relays: vec![], }, event.created_at, )); } // if contains new maintainer if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) { for m in &repo_ref.maintainers { if !request .repo_coordinates_without_relays // prexisting maintainers .iter() .map(|(c, _)| c.clone()) .collect::>() .union(&report.repo_coordinates_without_relays) // already added maintainers .any(|c| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key)) { let c = Coordinate { kind: event.kind, public_key: *m, identifier: repo_ref.identifier.clone(), relays: vec![], }; fresh_coordinates.insert(c.clone()); report.repo_coordinates_without_relays.insert(c); if !request.contributors.contains(m) && !request .profiles_to_fetch_from_user_relays .clone() .into_keys() .collect::>() .contains(m) && !fresh_profiles.contains(m) { fresh_profiles.insert(m.to_owned()); } } } } } else if event.kind.eq(&STATE_KIND) { let existing_state = if report.updated_state.is_some() { report.updated_state } else { request.state }; if let Some((timestamp, id)) = existing_state { if event.created_at.gt(×tamp) || (event.created_at.eq(×tamp) && event.id.gt(&id)) { report.updated_state = Some((event.created_at, event.id)); } } } else if event_is_patch_set_root(event) { fresh_proposal_roots.insert(event.id); report.proposals.insert(event.id); if !request.contributors.contains(&event.pubkey) && !fresh_profiles.contains(&event.pubkey) { fresh_profiles.insert(event.pubkey); } } else if [Kind::RelayList, Kind::Metadata].contains(&event.kind) { if request.missing_contributor_profiles.contains(&event.pubkey) { report.contributor_profiles.insert(event.pubkey); } else if let Some((_, (metadata_timestamp, relay_list_timestamp))) = request .profiles_to_fetch_from_user_relays .get_key_value(&event.pubkey) { if (Kind::Metadata.eq(&event.kind) && event.created_at.gt(metadata_timestamp)) || (Kind::RelayList.eq(&event.kind) && event.created_at.gt(relay_list_timestamp)) { report.profile_updates.insert(event.pubkey); } } save_event_in_global_cache(git_repo_path, event).await?; } } } for event in &events { if !request.existing_events.contains(&event.id) && !event .tags .event_ids() .any(|id| report.proposals.contains(id)) { if event.kind.eq(&Kind::GitPatch) && !event_is_patch_set_root(event) { report.commits.insert(event.id); } else if status_kinds().contains(&event.kind) { report.statuses.insert(event.id); } } } Ok(()) } pub fn consolidate_fetch_reports(reports: Vec>) -> FetchReport { let mut report = FetchReport::default(); for relay_report in reports.into_iter().flatten() { for c in relay_report.repo_coordinates_without_relays { if !report .repo_coordinates_without_relays .iter() .any(|e| e.eq(&c)) { report.repo_coordinates_without_relays.insert(c); } } for (r, t) in relay_report.updated_repo_announcements { if let Some(i) = report .updated_repo_announcements .iter() .position(|(e, _)| e.eq(&r)) { let (_, existing_t) = &report.updated_repo_announcements[i]; if t.gt(existing_t) { report.updated_repo_announcements[i] = (r, t); } } else { report.updated_repo_announcements.push((r, t)); } } if let Some((timestamp, id)) = relay_report.updated_state { if let Some((existing_timestamp, existing_id)) = report.updated_state { if timestamp.gt(&existing_timestamp) || (timestamp.eq(&existing_timestamp) && id.gt(&existing_id)) { report.updated_state = Some((timestamp, id)); } } else { report.updated_state = Some((timestamp, id)); } } for c in relay_report.proposals { report.proposals.insert(c); } for c in relay_report.commits { report.commits.insert(c); } for c in relay_report.statuses { report.statuses.insert(c); } for c in relay_report.contributor_profiles { report.contributor_profiles.insert(c); } for c in relay_report.profile_updates { report.profile_updates.insert(c); } } report } pub fn get_fetch_filters( repo_coordinates: &HashSet, proposal_ids: &HashSet, required_profiles: &HashSet, ) -> Vec { [ if repo_coordinates.is_empty() { vec![] } else { vec![ get_filter_state_events(repo_coordinates), get_filter_repo_events(repo_coordinates), nostr::Filter::default() .kinds(vec![Kind::GitPatch, Kind::EventDeletion]) .custom_tag( SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates .iter() .map(std::string::ToString::to_string) .collect::>(), ), ] }, if proposal_ids.is_empty() { vec![] } else { vec![ nostr::Filter::default() .events(proposal_ids.clone()) .kinds([vec![Kind::GitPatch, Kind::EventDeletion], status_kinds()].concat()), ] }, if required_profiles.is_empty() { vec![] } else { vec![get_filter_contributor_profiles(required_profiles.clone())] }, ] .concat() } pub fn get_filter_repo_events(repo_coordinates: &HashSet) -> nostr::Filter { nostr::Filter::default() .kind(Kind::GitRepoAnnouncement) .identifiers( repo_coordinates .iter() .map(|c| c.identifier.clone()) .collect::>(), ) .authors( repo_coordinates .iter() .map(|c| c.public_key) .collect::>(), ) } pub static STATE_KIND: nostr::Kind = Kind::Custom(30618); pub fn get_filter_state_events(repo_coordinates: &HashSet) -> nostr::Filter { nostr::Filter::default() .kind(STATE_KIND) .identifiers( repo_coordinates .iter() .map(|c| c.identifier.clone()) .collect::>(), ) .authors( repo_coordinates .iter() .map(|c| c.public_key) .collect::>(), ) } pub fn get_filter_contributor_profiles(contributors: HashSet) -> nostr::Filter { nostr::Filter::default() .kinds(vec![Kind::Metadata, Kind::RelayList]) .authors(contributors) } #[derive(Default)] pub struct FetchReport { repo_coordinates_without_relays: HashSet, updated_repo_announcements: Vec<(Coordinate, Timestamp)>, updated_state: Option<(Timestamp, EventId)>, proposals: HashSet, /// commits against existing propoals commits: HashSet, statuses: HashSet, contributor_profiles: HashSet, profile_updates: HashSet, } impl Display for FetchReport { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses" let mut display_items: Vec = vec![]; if !self.repo_coordinates_without_relays.is_empty() { display_items.push(format!( "{} new maintainer{}", self.repo_coordinates_without_relays.len(), if self.repo_coordinates_without_relays.len() > 1 { "s" } else { "" }, )); } if !self.updated_repo_announcements.is_empty() { display_items.push(format!( "{} announcement update{}", self.updated_repo_announcements.len(), if self.updated_repo_announcements.len() > 1 { "s" } else { "" }, )); } if self.updated_state.is_some() { display_items.push("new state".to_string()); } if !self.proposals.is_empty() { display_items.push(format!( "{} proposal{}", self.proposals.len(), if self.proposals.len() > 1 { "s" } else { "" }, )); } if !self.commits.is_empty() { display_items.push(format!( "{} commit{}", self.commits.len(), if self.commits.len() > 1 { "s" } else { "" }, )); } if !self.statuses.is_empty() { display_items.push(format!( "{} status{}", self.statuses.len(), if self.statuses.len() > 1 { "es" } else { "" }, )); } if !self.contributor_profiles.is_empty() { display_items.push(format!( "{} user profile{}", self.contributor_profiles.len(), if self.contributor_profiles.len() > 1 { "s" } else { "" }, )); } if !self.profile_updates.is_empty() { display_items.push(format!( "{} profile update{}", self.profile_updates.len(), if self.profile_updates.len() > 1 { "s" } else { "" }, )); } write!(f, "{}", display_items.join(", ")) } } #[derive(Default, Clone)] pub struct FetchRequest { repo_relays: HashSet, selected_relay: Option, relay_column_width: usize, repo_coordinates_without_relays: Vec<(Coordinate, Option)>, state: Option<(Timestamp, EventId)>, proposals: HashSet, contributors: HashSet, missing_contributor_profiles: HashSet, existing_events: HashSet, profiles_to_fetch_from_user_relays: HashMap, user_relays_for_profiles: HashSet, } pub async fn fetching_with_report( git_repo_path: &Path, #[cfg(test)] client: &crate::client::MockConnect, #[cfg(not(test))] client: &Client, repo_coordinates: &HashSet, ) -> Result { let term = console::Term::stderr(); term.write_line("fetching updates...")?; let (relay_reports, progress_reporter) = client .fetch_all(Some(git_repo_path), repo_coordinates, &HashSet::new()) .await?; if !relay_reports.iter().any(std::result::Result::is_err) { let _ = progress_reporter.clear(); } let report = consolidate_fetch_reports(relay_reports); if report.to_string().is_empty() { println!("no updates"); } else { println!("updates: {report}"); } Ok(report) } pub async fn get_proposals_and_revisions_from_cache( git_repo_path: &Path, repo_coordinates: HashSet, ) -> Result> { let mut proposals = get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kind(nostr::Kind::GitPatch) .custom_tag( nostr::SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), repo_coordinates .iter() .map(std::string::ToString::to_string) .collect::>(), ), ], ) .await? .iter() .filter(|e| event_is_patch_set_root(e)) .cloned() .collect::>(); proposals.sort_by_key(|e| e.created_at); proposals.reverse(); Ok(proposals) } pub async fn get_all_proposal_patch_events_from_cache( git_repo_path: &Path, repo_ref: &RepoRef, proposal_id: &nostr::EventId, ) -> Result> { let mut commit_events = get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kind(nostr::Kind::GitPatch) .event(*proposal_id), nostr::Filter::default() .kind(nostr::Kind::GitPatch) .id(*proposal_id), ], ) .await?; let permissioned_users: HashSet = [ repo_ref.maintainers.clone(), vec![ commit_events .iter() .find(|e| e.id.eq(proposal_id)) .context("proposal not in cache")? .pubkey, ], ] .concat() .iter() .copied() .collect(); commit_events.retain(|e| permissioned_users.contains(&e.pubkey)); let revision_roots: HashSet = commit_events .iter() .filter(|e| event_is_revision_root(e)) .map(|e| e.id) .collect(); if !revision_roots.is_empty() { for event in get_events_from_local_cache( git_repo_path, vec![ nostr::Filter::default() .kind(nostr::Kind::GitPatch) .events(revision_roots) .authors(permissioned_users.clone()), ], ) .await? { commit_events.push(event); } } Ok(commit_events .iter() .filter(|e| !event_is_cover_letter(e) && permissioned_users.contains(&e.pubkey)) .cloned() .collect()) } pub async fn get_event_from_cache_by_id(git_repo: &Repo, event_id: &EventId) -> Result { Ok(get_events_from_local_cache( git_repo.get_path()?, vec![nostr::Filter::default().id(*event_id)], ) .await? .first() .context("failed to find event in cache")? .clone()) } #[allow(clippy::module_name_repetitions)] #[allow(clippy::too_many_lines)] pub async fn send_events( #[cfg(test)] client: &crate::client::MockConnect, #[cfg(not(test))] client: &Client, git_repo_path: Option<&Path>, events: Vec, my_write_relays: Vec, repo_read_relays: Vec, animate: bool, silent: bool, ) -> Result<()> { let fallback = [ client.get_fallback_relays().clone(), if events.iter().any(|e| e.kind.eq(&Kind::GitRepoAnnouncement)) { client.get_blaster_relays().clone() } else { vec![] }, ] .concat(); let mut relays: Vec<&String> = vec![]; let all = &[ repo_read_relays.clone(), my_write_relays.clone(), fallback.clone(), ] .concat(); // add duplicates first for r in &repo_read_relays { let r_clean = remove_trailing_slash(r); if !my_write_relays .iter() .filter(|x| r_clean.eq(&remove_trailing_slash(x))) .count() > 1 && !relays.iter().any(|x| r_clean.eq(&remove_trailing_slash(x))) { relays.push(r); } } for r in all { let r_clean = remove_trailing_slash(r); if !relays.iter().any(|x| r_clean.eq(&remove_trailing_slash(x))) { relays.push(r); } } let m = if silent { MultiProgress::with_draw_target(ProgressDrawTarget::hidden()) } else { MultiProgress::new() }; let pb_style = ProgressStyle::with_template(if animate { " {spinner} {prefix} {bar} {pos}/{len} {msg}" } else { " - {prefix} {bar} {pos}/{len} {msg}" })? .progress_chars("##-"); let pb_after_style = |symbol| ProgressStyle::with_template(format!(" {symbol} {}", "{prefix} {msg}",).as_str()); let pb_after_style_succeeded = pb_after_style(if animate { console::style("✔".to_string()) .for_stderr() .green() .to_string() } else { "y".to_string() })?; let pb_after_style_failed = pb_after_style(if animate { console::style("✘".to_string()) .for_stderr() .red() .to_string() } else { "x".to_string() })?; #[allow(clippy::borrow_deref_ref)] join_all(relays.iter().map(|&relay| async { let relay_clean = remove_trailing_slash(&*relay); let details = format!( "{}{}{} {}", if my_write_relays .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(r))) { " [my-relay]" } else { "" }, if repo_read_relays .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(r))) { " [repo-relay]" } else { "" }, if fallback .iter() .any(|r| relay_clean.eq(&remove_trailing_slash(r))) { " [default]" } else { "" }, relay_clean, ); let pb = m.add( ProgressBar::new(events.len() as u64) .with_prefix(details.to_string()) .with_style(pb_style.clone()), ); if animate { pb.enable_steady_tick(Duration::from_millis(300)); } pb.inc(0); // need to make pb display intially let mut failed = false; for event in &events { match client .send_event_to(git_repo_path, relay.as_str(), event.clone()) .await { Ok(_) => pb.inc(1), Err(e) => { pb.set_style(pb_after_style_failed.clone()); pb.finish_with_message( console::style( e.to_string() .replace("relay pool error:", "error:") .replace("event not published: ", "error: "), ) .for_stderr() .red() .to_string(), ); failed = true; break; } }; } if !failed { pb.set_style(pb_after_style_succeeded.clone()); pb.finish_with_message(""); } })) .await; Ok(()) } fn remove_trailing_slash(s: &String) -> String { match s.as_str().strip_suffix('/') { Some(s) => s, None => s, } .to_string() }