From 142fee58b0449b3fe3f436986339c318de66b33f Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Mon, 15 Jul 2024 17:14:09 +0100 Subject: feat(fetch): fetch events and save to cache enabler to add simplicity, efficency and offline capability to other functions improve repo announcement selection --- src/client.rs | 883 +++++++++++++++++++++++++++++++++++++++++++--- src/main.rs | 3 + src/repo_ref.rs | 171 ++++++++- src/sub_commands/fetch.rs | 46 +++ src/sub_commands/init.rs | 12 +- src/sub_commands/list.rs | 2 +- src/sub_commands/mod.rs | 1 + 7 files changed, 1060 insertions(+), 58 deletions(-) create mode 100644 src/sub_commands/fetch.rs (limited to 'src') diff --git a/src/client.rs b/src/client.rs index a66adac..4054e7c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,16 +10,37 @@ // which is currently in nightly. alternatively we can use nightly as it looks // certain that the implementation is going to make it to stable but we don't // want to inadvertlty use other features of nightly that might be removed. -use std::{fmt::Write, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Write}, + fs::create_dir_all, + path::Path, + time::Duration, +}; use anyhow::{bail, Context, Result}; use async_trait::async_trait; +use console::Style; use futures::stream::{self, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; #[cfg(test)] use mockall::*; -use nostr::Event; -use nostr_sdk::{prelude::RelayLimits, EventBuilder, NostrSigner, Options}; +use nostr::{nips::nip01::Coordinate, Event}; +use nostr_database::{NostrDatabase, Order}; +use nostr_sdk::{ + prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey, + SingleLetterTag, Timestamp, Url, +}; +use nostr_sqlite::SQLiteDatabase; + +use crate::{ + config::get_dirs, + repo_ref::{RepoRef, REPO_REF_KIND}, + sub_commands::{ + list::status_kinds, + send::{event_is_patch_set_root, PATCH_KIND}, + }, +}; #[allow(clippy::struct_field_names)] pub struct Client { @@ -35,6 +56,7 @@ pub trait Connect { fn default() -> Self; fn new(opts: Params) -> Self; async fn set_signer(&mut self, signer: NostrSigner); + async fn connect(&self, relay_url: &Url) -> Result<()>; async fn disconnect(&self) -> Result<()>; fn get_fallback_relays(&self) -> &Vec; fn get_more_fallback_relays(&self) -> &Vec; @@ -45,6 +67,25 @@ pub trait Connect { relays: Vec, filters: Vec, ) -> Result>; + async fn get_events_per_relay( + &self, + relays: Vec, + filters: Vec, + progress_reporter: MultiProgress, + ) -> Result<(Vec>>, MultiProgress)>; + async fn fetch_all( + &self, + git_repo_path: &Path, + repo_coordinates: &HashSet, + ) -> Result; + async fn fetch_all_from_relay( + &self, + git_repo_path: &Path, + relay_url: Url, + request: FetchRequest, + // progress_reporter: &MultiProgress, + pb: &Option, + ) -> Result; } #[async_trait] @@ -110,6 +151,27 @@ impl Connect for Client { self.client.set_signer(Some(signer)).await; } + async fn connect(&self, relay_url: &Url) -> Result<()> { + self.client + .add_relay(relay_url) + .await + .context("cannot add relay")?; + + let relay = self.client.relay(relay_url).await?; + + if !relay.is_connected().await { + #[allow(clippy::large_futures)] + relay + .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT))) + .await; + } + + if !relay.is_connected().await { + bail!("connection timeout"); + } + Ok(()) + } + async fn disconnect(&self) -> Result<()> { self.client.disconnect().await?; Ok(()) @@ -139,6 +201,22 @@ impl Connect for Client { relays: Vec, filters: Vec, ) -> Result> { + let (relay_results, _) = self + .get_events_per_relay( + relays.iter().map(|r| Url::parse(r).unwrap()).collect(), + filters, + MultiProgress::new(), + ) + .await?; + Ok(get_dedup_events(relay_results)) + } + + async fn get_events_per_relay( + &self, + relays: Vec, + filters: Vec, + progress_reporter: MultiProgress, + ) -> Result<(Vec>>, MultiProgress)> { // add relays for relay in &relays { self.client @@ -147,59 +225,20 @@ impl Connect for Client { .context("cannot add relay")?; } - let m = MultiProgress::new(); - let pb_style = ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")? - .with_key("timeout_in", |state: &ProgressState, w: &mut dyn Write| { - if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { - write!( - w, - "timeout in {:.1}s", - GET_EVENTS_TIMEOUT - state.elapsed().as_secs() - ) - .unwrap(); - } - }); - - let pb_after_style = |succeed| { - ProgressStyle::with_template( - format!( - " {} {}", - if succeed { - console::style("✔".to_string()) - .for_stderr() - .green() - .to_string() - } else { - console::style("✘".to_string()) - .for_stderr() - .red() - .to_string() - }, - "{prefix} {msg}", - ) - .as_str(), - ) - }; - let relays_map = self.client.relays().await; let futures: Vec<_> = relays .clone() .iter() // don't look for events on blaster - .filter(|r| !r.contains("nostr.mutinywallet.com")) - .map(|r| { - ( - relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(), - filters.clone(), - ) - }) + .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) + .map(|r| (relays_map.get(r).unwrap(), filters.clone())) .map(|(relay, filters)| async { let pb = if std::env::var("NGITTEST").is_err() { - let pb = m.add( + let pb = progress_reporter.add( ProgressBar::new(1) .with_prefix(format!("{: <11}{}", "connecting", relay.url())) - .with_style(pb_style.clone()), + .with_style(pb_style()?), ); pb.enable_steady_tick(Duration::from_millis(300)); Some(pb) @@ -210,7 +249,7 @@ impl Connect for Client { match get_events_of(relay, filters, &pb).await { Err(error) => { if let Some(pb) = pb { - pb.set_style(pb_after_style(false)?); + pb.set_style(pb_after_style(false)); pb.set_prefix(format!("{: <11}{}", "error", relay.url())); pb.finish_with_message( console::style( @@ -225,7 +264,7 @@ impl Connect for Client { } Ok(res) => { if let Some(pb) = pb { - pb.set_style(pb_after_style(true)?); + pb.set_style(pb_after_style(true)); pb.set_prefix(format!( "{: <11}{}", format!("{} events", res.len()), @@ -239,9 +278,228 @@ impl Connect for Client { }) .collect(); - let relay_results = stream::iter(futures).buffer_unordered(15).collect().await; + let relay_results: Vec>> = + stream::iter(futures).buffer_unordered(15).collect().await; - Ok(get_dedup_events(relay_results)) + Ok((relay_results, progress_reporter)) + } + + #[allow(clippy::too_many_lines)] + async fn fetch_all( + &self, + git_repo_path: &Path, + repo_coordinates: &HashSet, + ) -> Result { + println!("fetching updates..."); + let mut fallback_relays = HashSet::new(); + for r in &self.fallback_relays { + if let Ok(url) = Url::parse(r) { + fallback_relays.insert(url); + } + } + let (relays, request) = + create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?; + let progress_reporter = MultiProgress::new(); + + for relay in &relays { + self.client + .add_relay(relay.as_str()) + .await + .context("cannot add relay")?; + } + + let dim = Style::new().color256(247); + + let futures: Vec<_> = relays + .iter() + // don't look for events on blaster + .filter(|r| !r.as_str().contains("nostr.mutinywallet.com")) + .map(|r| (r.clone(), request.clone())) + .map(|(relay, request)| async { + let relay_column_width = request.relay_column_width; + + let pb = if std::env::var("NGITTEST").is_err() { + let pb = progress_reporter.add( + ProgressBar::new(1) + .with_prefix( + dim.apply_to(format!( + "{: { + if let Some(pb) = pb { + pb.set_style(pb_after_style(false)); + pb.set_prefix( + dim.apply_to(format!( + "{: { + if let Some(pb) = pb { + pb.set_style(pb_after_style(true)); + pb.set_prefix( + dim.apply_to(format!( + "{: > = + stream::iter(futures).buffer_unordered(15).collect().await; + + let report = consolidate_fetch_reports(relay_reports); + + if report.to_string().is_empty() { + println!("no updates found"); + } else { + println!("fetched updates: {report}"); + } + Ok(report) + } + + async fn fetch_all_from_relay( + &self, + git_repo_path: &Path, + relay_url: Url, + request: FetchRequest, + // progress_reporter: &MultiProgress, + pb: &Option, + ) -> Result { + let mut fresh_coordinates: HashSet = HashSet::new(); + for (c, _) in request.repo_coordinates.clone() { + fresh_coordinates.insert(c); + } + let mut fresh_proposal_roots = request.proposals.clone(); + let mut fresh_authors = request.contributor_profiles.clone(); + + let mut report = FetchReport { + relay: Some(relay_url.clone()), + ..Default::default() + }; + + // let pb = if std::env::var("NGITTEST").is_err() { + // let pb = progress_reporter.add( + // ProgressBar::new(1) + // .with_prefix(format!("{: <11}{}", "connecting", relay_url)) + // .with_style(pb_style()?), + // ); + // pb.enable_steady_tick(Duration::from_millis(300)); + // Some(pb) + // } else { + // None + // }; + + self.connect(&relay_url).await?; + + let relay_column_width = request.relay_column_width; + + let dim = Style::new().color256(247); + + loop { + let filters = + get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_authors); + + if let Some(pb) = &pb { + pb.set_prefix( + dim.apply_to(format!( + "{: = get_events_of(&relay, filters, &None).await?; + // TODO: try reconcile + + for event in events { + // TODO existing_events or events in fresh + process_fetched_event( + event, + &request, + git_repo_path, + &mut fresh_coordinates, + &mut fresh_proposal_roots, + &mut report, + ) + .await?; + } + + if fresh_coordinates.is_empty() && fresh_proposal_roots.is_empty() { + break; + } + } + if let Some(pb) = pb { + let report_display = format!("{report}"); + pb.set_prefix( + dim.apply_to(format!( + "{: , pb: &Option, ) -> Result> { + // relay.reconcile(filter, opts).await?; + if !relay.is_connected().await { #[allow(clippy::large_futures)] relay @@ -324,3 +584,526 @@ pub async fn fetch_public_key(signer: &NostrSigner) -> Result term.clear_last_lines(1)?; Ok(public_key) } + +fn pb_style() -> Result { + Ok( + ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key( + "timeout_in", + |state: &ProgressState, w: &mut dyn Write| { + if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT { + let dim = Style::new().color256(247); + write!( + w, + "{}", + dim.apply_to(format!( + "timeout in {:.1}s", + GET_EVENTS_TIMEOUT - state.elapsed().as_secs() + )) + ) + .unwrap(); + } + }, + ), + ) +} + +fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle { + ProgressStyle::with_template( + format!( + " {} {}", + if succeed { + console::style("✔".to_string()) + .for_stderr() + .green() + .to_string() + } else { + console::style("✘".to_string()) + .for_stderr() + .red() + .to_string() + }, + "{prefix} {msg}", + ) + .as_str(), + ) + .unwrap() +} + +async fn get_local_cache_database(git_repo_path: &Path) -> Result { + SQLiteDatabase::open(git_repo_path.join(".git/nostr-cache.sqlite")) + .await + .context("cannot open or create nostr cache database at .git/nostr-cache.sqlite") +} + +async fn get_global_cache_database(git_repo_path: &Path) -> Result { + SQLiteDatabase::open(if std::env::var("NGITTEST").is_err() { + create_dir_all(get_dirs()?.config_dir()).context(format!( + "cannot create cache directory in: {:?}", + get_dirs()?.config_dir() + ))?; + get_dirs()?.config_dir().join("cache.sqlite") + } else { + git_repo_path.join(".git/test-global-cache.sqlite") + }) + .await + .context("cannot open ngit global nostr cache database") +} + +pub async fn get_event_from_cache( + git_repo_path: &Path, + filters: Vec, +) -> Result> { + get_local_cache_database(git_repo_path) + .await? + .query(filters.clone(), Order::Asc) + .await + .context( + "cannot execute query on opened git repo nostr cache database .git/nostr-cache.sqlite", + ) +} + +pub async fn get_event_from_global_cache( + git_repo_path: &Path, + filters: Vec, +) -> Result> { + get_global_cache_database(git_repo_path) + .await? + .query(filters.clone(), Order::Asc) + .await + .context("cannot execute query on opened ngit nostr cache database") +} + +pub async fn save_event_in_cache(git_repo_path: &Path, event: &nostr::Event) -> Result { + get_local_cache_database(git_repo_path) + .await? + .save_event(event) + .await + .context("cannot save event in local cache") +} + +pub async fn save_event_in_global_cache( + git_repo_path: &Path, + event: &nostr::Event, +) -> Result { + get_global_cache_database(git_repo_path) + .await? + .save_event(event) + .await + .context("cannot save event in local cache") +} + +pub async fn get_repo_ref_from_cache( + git_repo_path: &Path, + repo_coordinates: &HashSet, +) -> Result { + let mut maintainers = HashSet::new(); + let mut new_coordinate = false; + + for c in repo_coordinates { + maintainers.insert(c.public_key); + } + let mut repo_events = vec![]; + loop { + let filter = get_filter_repo_events(repo_coordinates); + + let events = [ + get_event_from_global_cache(git_repo_path, vec![filter.clone()]).await?, + get_event_from_cache(git_repo_path, vec![filter]).await?, + ] + .concat(); + for e in events { + if let Ok(repo_ref) = RepoRef::try_from(e.clone()) { + for m in repo_ref.maintainers { + if maintainers.insert(m) { + new_coordinate = true; + } + } + repo_events.push(e); + } + } + if !new_coordinate { + break; + } + } + repo_events.sort_by_key(|e| e.created_at); + let repo_ref = RepoRef::try_from( + repo_events + .first() + .context("no repo events at specified coordinates")? + .clone(), + )?; + + let mut events: HashMap = HashMap::new(); + for m in &maintainers { + if let Some(e) = repo_events.iter().find(|e| e.author().eq(m)) { + events.insert( + Coordinate { + kind: e.kind, + identifier: e.identifier().unwrap().to_string(), + public_key: e.author(), + relays: vec![], + }, + e.clone(), + ); + } + } + + Ok(RepoRef { + // use all maintainers from all events found, not just maintainers in the most + // recent event + maintainers: maintainers.iter().copied().collect::>(), + events, + ..repo_ref + }) +} + +async fn create_relays_request( + git_repo_path: &Path, + repo_coordinates: &HashSet, + fallback_relays: HashSet, +) -> Result<(HashSet, FetchRequest)> { + let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await; + + let relays = { + let mut relays = fallback_relays; + if let Ok(repo_ref) = &repo_ref { + for r in &repo_ref.relays { + if let Ok(url) = Url::parse(r) { + relays.insert(url); + } + } + } + relays + }; + + let relay_column_width = relays + .iter() + .reduce(|a, r| { + if r.to_string() + .chars() + .count() + .gt(&a.to_string().chars().count()) + { + r + } else { + a + } + }) + .unwrap() + .to_string() + .chars() + .count() + + 2; + + let repo_coordinates = if let Ok(repo_ref) = &repo_ref { + repo_ref.coordinates() + } else { + repo_coordinates.clone() + }; + + let proposals: HashSet = get_local_cache_database(git_repo_path) + .await? + .negentropy_items( + nostr::Filter::default() + .kinds(vec![Kind::Custom(PATCH_KIND)]) + .custom_tag( + SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), + repo_coordinates + .iter() + .map(std::string::ToString::to_string) + .collect::>(), + ), + ) + .await? + .iter() + .map(|(id, _)| *id) + .collect(); + + let contributor_profiles = HashSet::new(); + + let existing_events: HashSet = { + let mut existing_events: HashSet = HashSet::new(); + for filter in get_fetch_filters(&repo_coordinates, &proposals, &contributor_profiles) { + for (id, _) in get_local_cache_database(git_repo_path) + .await? + .negentropy_items(filter) + .await? + { + existing_events.insert(id); + } + } + existing_events + }; + Ok(( + relays, + FetchRequest { + relay_column_width, + repo_coordinates: if let Ok(repo_ref) = repo_ref { + repo_ref.coordinates_with_timestamps() + } else { + repo_coordinates.iter().map(|c| (c.clone(), None)).collect() + }, + proposals, + contributor_profiles, + existing_events, + }, + )) +} + +async fn process_fetched_event( + event: nostr::Event, + request: &FetchRequest, + git_repo_path: &Path, + fresh_coordinates: &mut HashSet, + fresh_proposal_roots: &mut HashSet, + report: &mut FetchReport, +) -> Result<()> { + if !request.existing_events.contains(&event.id) { + save_event_in_cache(git_repo_path, &event).await?; + if event.kind().as_u16().eq(&REPO_REF_KIND) { + save_event_in_global_cache(git_repo_path, &event).await?; + let new_coordinate = !request.repo_coordinates.iter().any(|(c, _)| { + c.identifier.eq(event.identifier().unwrap()) && c.public_key.eq(&event.pubkey) + }); + let update_to_existing = !new_coordinate + && request.repo_coordinates.iter().any(|(c, t)| { + c.identifier.eq(event.identifier().unwrap()) + && c.public_key.eq(&event.pubkey) + && if let Some(t) = t { + event.created_at.gt(t) + } else { + false + } + }); + if new_coordinate || update_to_existing { + let c = Coordinate { + kind: event.kind(), + public_key: event.author(), + identifier: event.identifier().unwrap().to_string(), + relays: vec![], + }; + if new_coordinate { + fresh_coordinates.insert(c.clone()); + report.repo_coordinates.push(c.clone()); + } + if update_to_existing { + report + .updated_repo_announcements + .push((c, event.created_at)); + } + } + // if contains new maintainer + if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) { + for m in &repo_ref.maintainers { + if !request + .repo_coordinates + .iter() + .any(|(c, _)| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key)) + { + fresh_coordinates.insert(Coordinate { + kind: event.kind(), + public_key: *m, + identifier: repo_ref.identifier.clone(), + relays: vec![], + }); + } + } + } + } else if event_is_patch_set_root(&event) { + fresh_proposal_roots.insert(event.id); + report.proposals.insert(event.id); + } else if !event.event_ids().any(|id| report.proposals.contains(id)) { + if event.kind().as_u16() == PATCH_KIND { + report.commits.insert(event.id); + } else if status_kinds().contains(&event.kind()) { + report.statuses.insert(event.id); + } + } else if event.kind().eq(&nostr_sdk::Kind::Metadata) { + report.contributor_profiles.insert(event.author()); + save_event_in_global_cache(git_repo_path, &event).await?; + } + } + Ok(()) +} + +fn consolidate_fetch_reports(reports: Vec>) -> FetchReport { + let mut report = FetchReport::default(); + for relay_report in reports.into_iter().flatten() { + for c in relay_report.repo_coordinates { + if !report.repo_coordinates.iter().any(|e| e.eq(&c)) { + report.repo_coordinates.push(c); + } + } + for (r, t) in relay_report.updated_repo_announcements { + if let Some(i) = report + .updated_repo_announcements + .iter() + .position(|(e, _)| e.eq(&r)) + { + let (_, existing_t) = &report.updated_repo_announcements[i]; + if t.gt(existing_t) { + report.updated_repo_announcements[i] = (r, t); + } + } else { + report.updated_repo_announcements.push((r, t)); + } + } + for c in relay_report.proposals { + report.proposals.insert(c); + } + for c in relay_report.commits { + report.commits.insert(c); + } + for c in relay_report.statuses { + report.statuses.insert(c); + } + } + report +} +pub fn get_fetch_filters( + repo_coordinates: &HashSet, + proposal_ids: &HashSet, + required_profiles: &HashSet, +) -> Vec { + [ + if repo_coordinates.is_empty() { + vec![] + } else { + vec![ + get_filter_repo_events(repo_coordinates), + nostr::Filter::default() + .kinds(vec![Kind::Custom(PATCH_KIND), Kind::EventDeletion]) + .custom_tag( + SingleLetterTag::lowercase(nostr_sdk::Alphabet::A), + repo_coordinates + .iter() + .map(std::string::ToString::to_string) + .collect::>(), + ), + ] + }, + if proposal_ids.is_empty() { + vec![] + } else { + vec![ + nostr::Filter::default().events(proposal_ids.clone()).kinds( + [ + vec![Kind::Custom(PATCH_KIND), Kind::EventDeletion], + status_kinds(), + ] + .concat(), + ), + ] + }, + if required_profiles.is_empty() { + vec![] + } else { + vec![ + nostr::Filter::default() + .kinds(vec![Kind::Metadata, Kind::RelayList]) + .authors(required_profiles.clone()), + ] + }, + ] + .concat() +} + +pub fn get_filter_repo_events(repo_coordinates: &HashSet) -> nostr::Filter { + nostr::Filter::default() + .kind(Kind::Custom(REPO_REF_KIND)) + .identifiers( + repo_coordinates + .iter() + .map(|c| c.identifier.clone()) + .collect::>(), + ) + .authors( + repo_coordinates + .iter() + .map(|c| c.public_key) + .collect::>(), + ) +} + +#[derive(Default)] +pub struct FetchReport { + relay: Option, + repo_coordinates: Vec, + updated_repo_announcements: Vec<(Coordinate, Timestamp)>, + proposals: HashSet, + /// commits against existing propoals + commits: HashSet, + statuses: HashSet, + contributor_profiles: HashSet, +} + +impl Display for FetchReport { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses" + let mut display_items: Vec = vec![]; + if !self.repo_coordinates.is_empty() { + display_items.push(format!( + "{} new maintainer{}", + self.repo_coordinates.len(), + if self.repo_coordinates.len() == 1 { + "s" + } else { + "" + }, + )); + } + if !self.updated_repo_announcements.is_empty() { + display_items.push(format!( + "{} announcement update{}", + self.updated_repo_announcements.len(), + if self.updated_repo_announcements.len() == 1 { + "s" + } else { + "" + }, + )); + } + if !self.proposals.is_empty() { + display_items.push(format!( + "{} proposal{}", + self.proposals.len(), + if self.proposals.len() == 1 { "s" } else { "" }, + )); + } + if !self.commits.is_empty() { + display_items.push(format!( + "{} commit{}", + self.commits.len(), + if self.commits.len() == 1 { "s" } else { "" }, + )); + } + if !self.statuses.is_empty() { + display_items.push(format!( + "{} status{}", + self.statuses.len(), + if self.statuses.len() == 1 { "es" } else { "" }, + )); + } + if !self.contributor_profiles.is_empty() { + display_items.push(format!( + "{} contributor profile{}", + self.contributor_profiles.len(), + if self.contributor_profiles.len() == 1 { + "s" + } else { + "" + }, + )); + } + write!(f, "{}", display_items.join(", ")) + } +} + +#[derive(Default, Clone)] +pub struct FetchRequest { + relay_column_width: usize, + repo_coordinates: Vec<(Coordinate, Option)>, + proposals: HashSet, + contributor_profiles: HashSet, + existing_events: HashSet, +} diff --git a/src/main.rs b/src/main.rs index 9f53084..1790c21 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,8 @@ pub struct Cli { #[derive(Subcommand)] enum Commands { + /// update cache with latest updates from nostr + Fetch(sub_commands::fetch::SubCommandArgs), /// signal you are this repo's maintainer accepting proposals via nostr Init(sub_commands::init::SubCommandArgs), /// issue commits as a proposal @@ -57,6 +59,7 @@ enum Commands { async fn main() -> Result<()> { let cli = Cli::parse(); match &cli.command { + Commands::Fetch(args) => sub_commands::fetch::launch(&cli, args).await, Commands::Login(args) => sub_commands::login::launch(&cli, args).await, Commands::Init(args) => sub_commands::init::launch(&cli, args).await, Commands::Send(args) => sub_commands::send::launch(&cli, args).await, diff --git a/src/repo_ref.rs b/src/repo_ref.rs index 8b34d2b..4952b16 100644 --- a/src/repo_ref.rs +++ b/src/repo_ref.rs @@ -1,8 +1,16 @@ -use std::{fs::File, io::BufReader, str::FromStr}; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + io::BufReader, + str::FromStr, +}; use anyhow::{bail, Context, Result}; -use nostr::{nips::nip19::Nip19, FromBech32, PublicKey, Tag, TagStandard, ToBech32}; -use nostr_sdk::NostrSigner; +use nostr::{ + nips::{nip01::Coordinate, nip19::Nip19}, + FromBech32, PublicKey, Tag, TagStandard, ToBech32, +}; +use nostr_sdk::{Kind, NostrSigner, Timestamp}; use serde::{Deserialize, Serialize}; #[cfg(not(test))] @@ -11,7 +19,7 @@ use crate::client::Client; use crate::client::MockConnect; use crate::{ cli_interactor::{Interactor, InteractorPrompt, PromptInputParms}, - client::{sign_event, Connect}, + client::{get_event_from_cache, get_event_from_global_cache, sign_event, Connect}, git::{Repo, RepoActions}, }; @@ -25,6 +33,7 @@ pub struct RepoRef { pub web: Vec, pub relays: Vec, pub maintainers: Vec, + pub events: HashMap, // code languages and hashtags } @@ -88,7 +97,16 @@ impl TryFrom for RepoRef { } else { r.maintainers = vec![event.pubkey]; } - + r.events = HashMap::new(); + r.events.insert( + Coordinate { + kind: event.kind, + identifier: event.identifier().unwrap().to_string(), + public_key: event.author(), + relays: vec![], + }, + event, + ); Ok(r) } } @@ -160,6 +178,145 @@ impl RepoRef { .await .context("failed to create repository reference event") } + pub fn coordinates(&self) -> HashSet { + let mut res = HashSet::new(); + for m in &self.maintainers { + res.insert(Coordinate { + kind: Kind::Custom(REPO_REF_KIND), + public_key: *m, + identifier: self.identifier.clone(), + relays: vec![], + }); + } + res + } + pub fn coordinates_with_timestamps(&self) -> Vec<(Coordinate, Option)> { + self.coordinates() + .iter() + .map(|c| (c.clone(), self.events.get(c).map(|e| e.created_at))) + .collect::)>>() + } +} + +pub async fn get_repo_coordinates( + git_repo: &Repo, + #[cfg(test)] client: &crate::client::MockConnect, + #[cfg(not(test))] client: &Client, +) -> Result> { + let mut repo_coordinates = HashSet::new(); + + if let Some(repo_override) = git_repo.get_git_config_item("nostr.repo", Some(false))? { + for s in repo_override.split(',') { + if let Ok(c) = Coordinate::parse(s) { + repo_coordinates.insert(c); + } + } + } + + // TODO: when nostr remotes functionality is added, iterate on each remote and + // extract coordinates + + if repo_coordinates.is_empty() { + if let Ok(repo_config) = get_repo_config_from_yaml(git_repo) { + let maintainers = { + let mut maintainers = HashSet::new(); + for m in &repo_config.maintainers { + if let Ok(maintainer) = PublicKey::parse(m) { + maintainers.insert(maintainer); + } + } + maintainers + }; + if let Some(identifier) = repo_config.identifier { + for public_key in maintainers { + repo_coordinates.insert(Coordinate { + kind: Kind::Custom(REPO_REF_KIND), + public_key, + identifier: identifier.clone(), + relays: vec![], + }); + } + } else { + // if repo_config.identifier.is_empty() { + // this will only apply for a few repositories created before ngit v1.3 + // that haven't updated their maintainers.yaml + if let Ok(Some(current_user_npub)) = + git_repo.get_git_config_item("nostr.npub", None) + { + if let Ok(current_user) = PublicKey::parse(current_user_npub) { + for m in &repo_config.maintainers { + if let Ok(maintainer) = PublicKey::parse(m) { + if current_user.eq(&maintainer) { + println!( + "please run `nigt init` to add the repo identifier to maintainers.yaml" + ); + } + } + } + } + } + // look find all repo refs with root_commit. for identifier + let filter = nostr::Filter::default() + .kind(nostr::Kind::Custom(REPO_REF_KIND)) + .reference(git_repo.get_root_commit()?.to_string()) + .authors(maintainers.clone()); + let mut events = + get_event_from_cache(git_repo.get_path()?, vec![filter.clone()]).await?; + if events.is_empty() { + events = + get_event_from_global_cache(git_repo.get_path()?, vec![filter.clone()]) + .await?; + } + if events.is_empty() { + events = client + .get_events(client.get_fallback_relays().clone(), vec![filter.clone()]) + .await?; + } + if let Some(e) = events.first() { + if let Some(identifier) = e.identifier() { + for m in &repo_config.maintainers { + if let Ok(maintainer) = PublicKey::parse(m) { + repo_coordinates.insert(Coordinate { + kind: Kind::Custom(REPO_REF_KIND), + public_key: maintainer, + identifier: identifier.to_string(), + relays: vec![], + }); + } + } + } + } else { + let c = ask_for_naddr()?; + git_repo.save_git_config_item("nostr.repo", &c.to_bech32()?, false)?; + repo_coordinates.insert(c); + } + } + } + } + + if repo_coordinates.is_empty() { + // TODO: present list of events filter by root_commit + // TODO: fallback to search based on identifier + let c = ask_for_naddr()?; + // PROBLEM: we are saving this before checking whether it actually exists, which + // means next time the user won't be prompted and may not know how to + // change the selected repo + git_repo.save_git_config_item("nostr.repo", &c.to_bech32()?, false)?; + repo_coordinates.insert(c); + } + Ok(repo_coordinates) +} + +fn ask_for_naddr() -> Result { + let mut prompt = "repository naddr"; + Ok(loop { + if let Ok(c) = Coordinate::parse( + Interactor::default().input(PromptInputParms::default().with_prompt(prompt))?, + ) { + break c; + } + prompt = "repository valid naddr"; + }) } pub async fn fetch( @@ -248,6 +405,7 @@ pub async fn fetch( #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq, Eq)] pub struct RepoConfigYaml { + pub identifier: Option, pub maintainers: Vec, pub relays: Vec, } @@ -277,6 +435,7 @@ pub fn extract_pks(pk_strings: Vec) -> Result> { pub fn save_repo_config_to_yaml( git_repo: &Repo, + identifier: String, maintainers: Vec, relays: Vec, ) -> Result<()> { @@ -301,6 +460,7 @@ pub fn save_repo_config_to_yaml( serde_yaml::to_writer( file, &RepoConfigYaml { + identifier: Some(identifier), maintainers: maintainers_npubs, relays, }, @@ -327,6 +487,7 @@ mod tests { ], relays: vec!["ws://relay1.io".to_string(), "ws://relay2.io".to_string()], maintainers: vec![TEST_KEY_1_KEYS.public_key(), TEST_KEY_2_KEYS.public_key()], + events: HashMap::new(), } .to_event(&TEST_KEY_1_SIGNER) .await diff --git a/src/sub_commands/fetch.rs b/src/sub_commands/fetch.rs new file mode 100644 index 0000000..07fd6f9 --- /dev/null +++ b/src/sub_commands/fetch.rs @@ -0,0 +1,46 @@ +use std::collections::HashSet; + +use anyhow::{Context, Result}; +use clap; +use nostr::nips::nip01::Coordinate; + +#[cfg(not(test))] +use crate::client::Client; +#[cfg(test)] +use crate::client::MockConnect; +use crate::{ + client::Connect, + git::{Repo, RepoActions}, + repo_ref::get_repo_coordinates, + Cli, +}; + +#[derive(clap::Args)] +pub struct SubCommandArgs { + /// address pointer to repo announcement + #[arg(long, action)] + repo: Vec, +} + +pub async fn launch(args: &Cli, command_args: &SubCommandArgs) -> Result<()> { + let _ = args; + let git_repo = Repo::discover().context("cannot find a git repository")?; + #[cfg(not(test))] + let client = Client::default(); + #[cfg(test)] + let client = ::default(); + let repo_coordinates = if command_args.repo.is_empty() { + get_repo_coordinates(&git_repo, &client).await? + } else { + let mut repo_coordinates = HashSet::new(); + for repo in &command_args.repo { + repo_coordinates.insert(Coordinate::parse(repo.clone())?); + } + repo_coordinates + }; + client + .fetch_all(git_repo.get_path()?, &repo_coordinates) + .await?; + client.disconnect().await?; + Ok(()) +} diff --git a/src/sub_commands/init.rs b/src/sub_commands/init.rs index 57785db..db90acd 100644 --- a/src/sub_commands/init.rs +++ b/src/sub_commands/init.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Context, Result}; use nostr::{FromBech32, PublicKey, ToBech32}; @@ -291,7 +293,7 @@ pub async fn launch(cli_args: &Cli, args: &SubCommandArgs) -> Result<()> { println!("publishing repostory reference..."); let repo_event = RepoRef { - identifier, + identifier: identifier.clone(), name, description, root_commit: earliest_unique_commit, @@ -299,6 +301,7 @@ pub async fn launch(cli_args: &Cli, args: &SubCommandArgs) -> Result<()> { web, relays: relays.clone(), maintainers: maintainers.clone(), + events: HashMap::new(), } .to_event(&signer) .await?; @@ -322,7 +325,12 @@ pub async fn launch(cli_args: &Cli, args: &SubCommandArgs) -> Result<()> { } Err(_) => true, } { - save_repo_config_to_yaml(&git_repo, maintainers.clone(), relays.clone())?; + save_repo_config_to_yaml( + &git_repo, + identifier.clone(), + maintainers.clone(), + relays.clone(), + )?; println!( "maintainers.yaml {}. commit and push.", if repo_config_result.is_err() { diff --git a/src/sub_commands/list.rs b/src/sub_commands/list.rs index 5dc868c..d3f583f 100644 --- a/src/sub_commands/list.rs +++ b/src/sub_commands/list.rs @@ -820,7 +820,7 @@ pub static STATUS_KIND_APPLIED: u16 = 1631; pub static STATUS_KIND_CLOSED: u16 = 1632; pub static STATUS_KIND_DRAFT: u16 = 1633; -fn status_kinds() -> Vec { +pub fn status_kinds() -> Vec { vec![ nostr::Kind::Custom(STATUS_KIND_OPEN), nostr::Kind::Custom(STATUS_KIND_APPLIED), diff --git a/src/sub_commands/mod.rs b/src/sub_commands/mod.rs index 9f97b7e..29a60f9 100644 --- a/src/sub_commands/mod.rs +++ b/src/sub_commands/mod.rs @@ -1,3 +1,4 @@ +pub mod fetch; pub mod init; pub mod list; pub mod login; -- cgit v1.2.3