upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/lib/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/client.rs')
-rw-r--r--src/lib/client.rs1480
1 files changed, 1480 insertions, 0 deletions
diff --git a/src/lib/client.rs b/src/lib/client.rs
new file mode 100644
index 0000000..abde217
--- /dev/null
+++ b/src/lib/client.rs
@@ -0,0 +1,1480 @@
1// have you considered
2
3// TO USE ASYNC
4
5// in traits (required for mocking unit tests)
6// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
7// https://github.com/dtolnay/async-trait
8// see https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html
9// I think we can use the async-trait crate and switch to the native feature
10// which is currently in nightly. alternatively we can use nightly as it looks
11// certain that the implementation is going to make it to stable but we don't
12// want to inadvertlty use other features of nightly that might be removed.
13use std::{
14 collections::{HashMap, HashSet},
15 fmt::{Display, Write},
16 fs::create_dir_all,
17 path::Path,
18 time::Duration,
19};
20
21use anyhow::{bail, Context, Result};
22use async_trait::async_trait;
23use console::Style;
24use futures::stream::{self, StreamExt};
25use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
26#[cfg(test)]
27use mockall::*;
28use nostr::{nips::nip01::Coordinate, Event};
29use nostr_database::{NostrDatabase, Order};
30use nostr_sdk::{
31 prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey,
32 SingleLetterTag, Timestamp, Url,
33};
34use nostr_sqlite::SQLiteDatabase;
35
36use crate::{
37 config::get_dirs,
38 login::{get_logged_in_user, get_user_ref_from_cache},
39 repo_ref::RepoRef,
40 repo_state::RepoState,
41 sub_commands::{
42 list::status_kinds,
43 send::{event_is_patch_set_root, event_is_revision_root},
44 },
45};
46
47#[allow(clippy::struct_field_names)]
48pub struct Client {
49 client: nostr_sdk::Client,
50 fallback_relays: Vec<String>,
51 more_fallback_relays: Vec<String>,
52 blaster_relays: Vec<String>,
53}
54
55#[cfg_attr(test, automock)]
56#[async_trait]
57pub trait Connect {
58 fn default() -> Self;
59 fn new(opts: Params) -> Self;
60 async fn set_signer(&mut self, signer: NostrSigner);
61 async fn connect(&self, relay_url: &Url) -> Result<()>;
62 async fn disconnect(&self) -> Result<()>;
63 fn get_fallback_relays(&self) -> &Vec<String>;
64 fn get_more_fallback_relays(&self) -> &Vec<String>;
65 fn get_blaster_relays(&self) -> &Vec<String>;
66 async fn send_event_to(
67 &self,
68 git_repo_path: &Path,
69 url: &str,
70 event: nostr::event::Event,
71 ) -> Result<nostr::EventId>;
72 async fn get_events(
73 &self,
74 relays: Vec<String>,
75 filters: Vec<nostr::Filter>,
76 ) -> Result<Vec<nostr::Event>>;
77 async fn get_events_per_relay(
78 &self,
79 relays: Vec<Url>,
80 filters: Vec<nostr::Filter>,
81 progress_reporter: MultiProgress,
82 ) -> Result<(Vec<Result<Vec<nostr::Event>>>, MultiProgress)>;
83 async fn fetch_all(
84 &self,
85 git_repo_path: &Path,
86 repo_coordinates: &HashSet<Coordinate>,
87 user_profiles: &HashSet<PublicKey>,
88 ) -> Result<(Vec<Result<FetchReport>>, MultiProgress)>;
89 async fn fetch_all_from_relay(
90 &self,
91 git_repo_path: &Path,
92 request: FetchRequest,
93 pb: &Option<ProgressBar>,
94 ) -> Result<FetchReport>;
95}
96
97#[async_trait]
98impl Connect for Client {
99 fn default() -> Self {
100 let fallback_relays: Vec<String> = if std::env::var("NGITTEST").is_ok() {
101 vec![
102 "ws://localhost:8051".to_string(),
103 "ws://localhost:8052".to_string(),
104 ]
105 } else {
106 vec![
107 "wss://relay.damus.io".to_string(), /* free, good reliability, have been known
108 * to delete all messages */
109 "wss://nos.lol".to_string(),
110 "wss://relay.nostr.band".to_string(),
111 ]
112 };
113
114 let more_fallback_relays: Vec<String> = if std::env::var("NGITTEST").is_ok() {
115 vec![
116 "ws://localhost:8055".to_string(),
117 "ws://localhost:8056".to_string(),
118 ]
119 } else {
120 vec![
121 "wss://purplerelay.com".to_string(), // free but reliability not tested
122 "wss://purplepages.es".to_string(), // for profile events but unreliable
123 "wss://relayable.org".to_string(), // free but not always reliable
124 ]
125 };
126
127 let blaster_relays: Vec<String> = if std::env::var("NGITTEST").is_ok() {
128 vec!["ws://localhost:8057".to_string()]
129 } else {
130 vec!["wss://nostr.mutinywallet.com".to_string()]
131 };
132 Client {
133 client: nostr_sdk::ClientBuilder::new()
134 .opts(Options::new().relay_limits(RelayLimits::disable()))
135 .build(),
136 fallback_relays,
137 more_fallback_relays,
138 blaster_relays,
139 }
140 }
141 fn new(opts: Params) -> Self {
142 Client {
143 client: nostr_sdk::ClientBuilder::new()
144 .opts(Options::new().relay_limits(RelayLimits::disable()))
145 .signer(&opts.keys.unwrap_or(nostr::Keys::generate()))
146 // .database(
147 // SQLiteDatabase::open(get_dirs()?.cache_dir().join("nostr-cache.sqlite")).
148 // await?, )
149 .build(),
150 fallback_relays: opts.fallback_relays,
151 more_fallback_relays: opts.more_fallback_relays,
152 blaster_relays: opts.blaster_relays,
153 }
154 }
155
156 async fn set_signer(&mut self, signer: NostrSigner) {
157 self.client.set_signer(Some(signer)).await;
158 }
159
160 async fn connect(&self, relay_url: &Url) -> Result<()> {
161 self.client
162 .add_relay(relay_url)
163 .await
164 .context("cannot add relay")?;
165
166 let relay = self.client.relay(relay_url).await?;
167
168 if !relay.is_connected().await {
169 #[allow(clippy::large_futures)]
170 relay
171 .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT)))
172 .await;
173 }
174
175 if !relay.is_connected().await {
176 bail!("connection timeout");
177 }
178 Ok(())
179 }
180
181 async fn disconnect(&self) -> Result<()> {
182 self.client.disconnect().await?;
183 Ok(())
184 }
185
186 fn get_fallback_relays(&self) -> &Vec<String> {
187 &self.fallback_relays
188 }
189
190 fn get_more_fallback_relays(&self) -> &Vec<String> {
191 &self.more_fallback_relays
192 }
193
194 fn get_blaster_relays(&self) -> &Vec<String> {
195 &self.blaster_relays
196 }
197
198 async fn send_event_to(
199 &self,
200 git_repo_path: &Path,
201 url: &str,
202 event: Event,
203 ) -> Result<nostr::EventId> {
204 self.client.add_relay(url).await?;
205 #[allow(clippy::large_futures)]
206 self.client.connect_relay(url).await?;
207 let res = self.client.send_event_to(vec![url], event.clone()).await?;
208 if let Some(err) = res.failed.get(&Url::parse(url)?) {
209 bail!(if let Some(err) = err {
210 err.to_string()
211 } else {
212 "error: unknown".to_string()
213 });
214 }
215 save_event_in_cache(git_repo_path, &event).await?;
216 if event.kind().eq(&Kind::GitRepoAnnouncement) {
217 save_event_in_global_cache(git_repo_path, &event).await?;
218 }
219 Ok(event.id())
220 }
221
222 async fn get_events(
223 &self,
224 relays: Vec<String>,
225 filters: Vec<nostr::Filter>,
226 ) -> Result<Vec<nostr::Event>> {
227 let (relay_results, _) = self
228 .get_events_per_relay(
229 relays.iter().map(|r| Url::parse(r).unwrap()).collect(),
230 filters,
231 MultiProgress::new(),
232 )
233 .await?;
234 Ok(get_dedup_events(relay_results))
235 }
236
237 async fn get_events_per_relay(
238 &self,
239 relays: Vec<Url>,
240 filters: Vec<nostr::Filter>,
241 progress_reporter: MultiProgress,
242 ) -> Result<(Vec<Result<Vec<nostr::Event>>>, MultiProgress)> {
243 // add relays
244 for relay in &relays {
245 self.client
246 .add_relay(relay.as_str())
247 .await
248 .context("cannot add relay")?;
249 }
250
251 let relays_map = self.client.relays().await;
252
253 let futures: Vec<_> = relays
254 .clone()
255 .iter()
256 // don't look for events on blaster
257 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
258 .map(|r| (relays_map.get(r).unwrap(), filters.clone()))
259 .map(|(relay, filters)| async {
260 let pb = if std::env::var("NGITTEST").is_err() {
261 let pb = progress_reporter.add(
262 ProgressBar::new(1)
263 .with_prefix(format!("{: <11}{}", "connecting", relay.url()))
264 .with_style(pb_style()?),
265 );
266 pb.enable_steady_tick(Duration::from_millis(300));
267 Some(pb)
268 } else {
269 None
270 };
271 #[allow(clippy::large_futures)]
272 match get_events_of(relay, filters, &pb).await {
273 Err(error) => {
274 if let Some(pb) = pb {
275 pb.set_style(pb_after_style(false));
276 pb.set_prefix(format!("{: <11}{}", "error", relay.url()));
277 pb.finish_with_message(
278 console::style(
279 error.to_string().replace("relay pool error:", "error:"),
280 )
281 .for_stderr()
282 .red()
283 .to_string(),
284 );
285 }
286 Err(error)
287 }
288 Ok(res) => {
289 if let Some(pb) = pb {
290 pb.set_style(pb_after_style(true));
291 pb.set_prefix(format!(
292 "{: <11}{}",
293 format!("{} events", res.len()),
294 relay.url()
295 ));
296 pb.finish_with_message("");
297 }
298 Ok(res)
299 }
300 }
301 })
302 .collect();
303
304 let relay_results: Vec<Result<Vec<nostr::Event>>> =
305 stream::iter(futures).buffer_unordered(15).collect().await;
306
307 Ok((relay_results, progress_reporter))
308 }
309
310 #[allow(clippy::too_many_lines)]
311 async fn fetch_all(
312 &self,
313 git_repo_path: &Path,
314 repo_coordinates: &HashSet<Coordinate>,
315 user_profiles: &HashSet<PublicKey>,
316 ) -> Result<(Vec<Result<FetchReport>>, MultiProgress)> {
317 let fallback_relays = &self
318 .fallback_relays
319 .iter()
320 .filter_map(|r| Url::parse(r).ok())
321 .collect::<HashSet<Url>>();
322
323 let mut request = create_relays_request(
324 git_repo_path,
325 repo_coordinates,
326 user_profiles,
327 fallback_relays.clone(),
328 )
329 .await?;
330
331 let progress_reporter = MultiProgress::new();
332
333 let mut processed_relays = HashSet::new();
334
335 let mut relay_reports: Vec<Result<FetchReport>> = vec![];
336
337 loop {
338 let relays = request
339 .repo_relays
340 .union(&request.user_relays_for_profiles)
341 // don't look for events on blaster
342 .filter(|&r| !r.as_str().contains("nostr.mutinywallet.com"))
343 .cloned()
344 .collect::<HashSet<Url>>()
345 .difference(&processed_relays)
346 .cloned()
347 .collect::<HashSet<Url>>();
348 if relays.is_empty() {
349 break;
350 }
351 let profile_relays_only = request
352 .user_relays_for_profiles
353 .difference(&request.repo_relays)
354 .collect::<HashSet<&Url>>();
355 for relay in &request.repo_relays {
356 self.client
357 .add_relay(relay.as_str())
358 .await
359 .context("cannot add relay")?;
360 }
361
362 let dim = Style::new().color256(247);
363
364 let futures: Vec<_> = relays
365 .iter()
366 .map(|r| {
367 if profile_relays_only.contains(r) {
368 // if relay isn't a repo relay, just filter for user profile
369 FetchRequest {
370 selected_relay: Some(r.to_owned()),
371 repo_coordinates_without_relays: vec![],
372 proposals: HashSet::new(),
373 missing_contributor_profiles: request
374 .missing_contributor_profiles
375 .union(
376 &request
377 .profiles_to_fetch_from_user_relays
378 .clone()
379 .into_keys()
380 .collect(),
381 )
382 .copied()
383 .collect(),
384 ..request.clone()
385 }
386 } else {
387 FetchRequest {
388 selected_relay: Some(r.to_owned()),
389 ..request.clone()
390 }
391 }
392 })
393 .map(|request| async {
394 let relay_column_width = request.relay_column_width;
395
396 let relay_url = request
397 .selected_relay
398 .clone()
399 .context("fetch_all_from_relay called without a relay")?;
400
401 let pb = if std::env::var("NGITTEST").is_err() {
402 let pb = progress_reporter.add(
403 ProgressBar::new(1)
404 .with_prefix(
405 dim.apply_to(format!(
406 "{: <relay_column_width$} connecting",
407 &relay_url
408 ))
409 .to_string(),
410 )
411 .with_style(pb_style()?),
412 );
413 pb.enable_steady_tick(Duration::from_millis(300));
414 Some(pb)
415 } else {
416 None
417 };
418
419 #[allow(clippy::large_futures)]
420 match self.fetch_all_from_relay(git_repo_path, request, &pb).await {
421 Err(error) => {
422 if let Some(pb) = pb {
423 pb.set_style(pb_after_style(false));
424 pb.set_prefix(
425 dim.apply_to(format!("{: <relay_column_width$}", &relay_url))
426 .to_string(),
427 );
428 pb.finish_with_message(
429 console::style(
430 error.to_string().replace("relay pool error:", "error:"),
431 )
432 .for_stderr()
433 .red()
434 .to_string(),
435 );
436 }
437 Err(error)
438 }
439 Ok(res) => Ok(res),
440 }
441 })
442 .collect();
443
444 for report in stream::iter(futures)
445 .buffer_unordered(15)
446 .collect::<Vec<Result<FetchReport>>>()
447 .await
448 {
449 relay_reports.push(report);
450 }
451 processed_relays.extend(relays.clone());
452
453 if let Ok(repo_ref) = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await {
454 request.repo_relays = repo_ref
455 .relays
456 .iter()
457 .filter_map(|r| Url::parse(r).ok())
458 .collect();
459 }
460
461 request.user_relays_for_profiles = {
462 let mut set = HashSet::new();
463 for user in &request
464 .profiles_to_fetch_from_user_relays
465 .clone()
466 .into_keys()
467 .collect::<Vec<PublicKey>>()
468 {
469 if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await {
470 for r in user_ref.relays.write() {
471 if let Ok(url) = Url::parse(&r) {
472 set.insert(url);
473 }
474 }
475 }
476 }
477 set
478 };
479 }
480 Ok((relay_reports, progress_reporter))
481 }
482
483 async fn fetch_all_from_relay(
484 &self,
485 git_repo_path: &Path,
486 request: FetchRequest,
487 pb: &Option<ProgressBar>,
488 ) -> Result<FetchReport> {
489 let mut fresh_coordinates: HashSet<Coordinate> = HashSet::new();
490 for (c, _) in request.repo_coordinates_without_relays.clone() {
491 fresh_coordinates.insert(c);
492 }
493 let mut fresh_proposal_roots = request.proposals.clone();
494 let mut fresh_profiles: HashSet<PublicKey> = request
495 .missing_contributor_profiles
496 .union(
497 &request
498 .profiles_to_fetch_from_user_relays
499 .clone()
500 .into_keys()
501 .collect(),
502 )
503 .copied()
504 .collect();
505
506 let mut report = FetchReport::default();
507
508 let relay_url = request
509 .selected_relay
510 .clone()
511 .context("fetch_all_from_relay called without a relay")?;
512
513 let relay_column_width = request.relay_column_width;
514
515 self.connect(&relay_url).await?;
516
517 let dim = Style::new().color256(247);
518
519 loop {
520 let filters =
521 get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_profiles);
522
523 if let Some(pb) = &pb {
524 pb.set_prefix(
525 dim.apply_to(format!(
526 "{: <relay_column_width$} {}",
527 &relay_url,
528 if report.to_string().is_empty() {
529 "fetching".to_string()
530 } else {
531 format!("fetching... updates: {report}")
532 },
533 ))
534 .to_string(),
535 );
536 }
537
538 fresh_coordinates = HashSet::new();
539 fresh_proposal_roots = HashSet::new();
540 fresh_profiles = HashSet::new();
541
542 let relay = self.client.relay(&relay_url).await?;
543 let events: Vec<nostr::Event> = get_events_of(&relay, filters.clone(), &None)
544 .await?
545 .iter()
546 // don't process events that don't match filters
547 .filter(|e| filters.iter().any(|f| f.match_event(e)))
548 .cloned()
549 .collect();
550 // TODO: try reconcile
551
552 process_fetched_events(
553 events,
554 &request,
555 git_repo_path,
556 &mut fresh_coordinates,
557 &mut fresh_proposal_roots,
558 &mut fresh_profiles,
559 &mut report,
560 )
561 .await?;
562
563 if fresh_coordinates.is_empty()
564 && fresh_proposal_roots.is_empty()
565 && fresh_profiles.is_empty()
566 {
567 break;
568 }
569 }
570 if let Some(pb) = pb {
571 pb.set_style(pb_after_style(true));
572 pb.set_prefix(
573 dim.apply_to(format!(
574 "{: <relay_column_width$} {}",
575 relay_url,
576 if report.to_string().is_empty() {
577 "no new events".to_string()
578 } else {
579 format!("new events: {report}")
580 },
581 ))
582 .to_string(),
583 );
584 pb.finish_with_message("");
585 }
586 Ok(report)
587 }
588}
589
590static CONNECTION_TIMEOUT: u64 = 3;
591static GET_EVENTS_TIMEOUT: u64 = 7;
592
593async fn get_events_of(
594 relay: &nostr_sdk::Relay,
595 filters: Vec<nostr::Filter>,
596 pb: &Option<ProgressBar>,
597) -> Result<Vec<Event>> {
598 // relay.reconcile(filter, opts).await?;
599
600 if !relay.is_connected().await {
601 #[allow(clippy::large_futures)]
602 relay
603 .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT)))
604 .await;
605 }
606
607 if !relay.is_connected().await {
608 bail!("connection timeout");
609 } else if let Some(pb) = pb {
610 pb.set_prefix(format!("connected {}", relay.url()));
611 }
612 let events = relay
613 .get_events_of(
614 filters,
615 // 20 is nostr_sdk default
616 std::time::Duration::from_secs(GET_EVENTS_TIMEOUT),
617 nostr_sdk::FilterOptions::ExitOnEOSE,
618 )
619 .await?;
620 Ok(events)
621}
622
623#[derive(Default)]
624pub struct Params {
625 pub keys: Option<nostr::Keys>,
626 pub fallback_relays: Vec<String>,
627 pub more_fallback_relays: Vec<String>,
628 pub blaster_relays: Vec<String>,
629}
630
631fn get_dedup_events(relay_results: Vec<Result<Vec<nostr::Event>>>) -> Vec<Event> {
632 let mut dedup_events: Vec<Event> = vec![];
633 for events in relay_results.into_iter().flatten() {
634 for event in events {
635 if !dedup_events.iter().any(|e| event.id.eq(&e.id)) {
636 dedup_events.push(event);
637 }
638 }
639 }
640 dedup_events
641}
642
643pub async fn sign_event(event_builder: EventBuilder, signer: &NostrSigner) -> Result<nostr::Event> {
644 if signer.r#type().eq(&nostr_signer::NostrSignerType::NIP46) {
645 let term = console::Term::stderr();
646 term.write_line("signing event with remote signer...")?;
647 let event = signer
648 .sign_event_builder(event_builder)
649 .await
650 .context("failed to sign event")?;
651 term.clear_last_lines(1)?;
652 Ok(event)
653 } else {
654 signer
655 .sign_event_builder(event_builder)
656 .await
657 .context("failed to sign event")
658 }
659}
660
661pub async fn fetch_public_key(signer: &NostrSigner) -> Result<nostr::PublicKey> {
662 let term = console::Term::stderr();
663 term.write_line("fetching npub from remote signer...")?;
664 let public_key = signer
665 .public_key()
666 .await
667 .context("failed to get npub from remote signer")?;
668 term.clear_last_lines(1)?;
669 Ok(public_key)
670}
671
672fn pb_style() -> Result<ProgressStyle> {
673 Ok(
674 ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key(
675 "timeout_in",
676 |state: &ProgressState, w: &mut dyn Write| {
677 if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT {
678 let dim = Style::new().color256(247);
679 write!(
680 w,
681 "{}",
682 dim.apply_to(format!(
683 "timeout in {:.1}s",
684 GET_EVENTS_TIMEOUT - state.elapsed().as_secs()
685 ))
686 )
687 .unwrap();
688 }
689 },
690 ),
691 )
692}
693
694fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle {
695 ProgressStyle::with_template(
696 format!(
697 " {} {}",
698 if succeed {
699 console::style("✔".to_string())
700 .for_stderr()
701 .green()
702 .to_string()
703 } else {
704 console::style("✘".to_string())
705 .for_stderr()
706 .red()
707 .to_string()
708 },
709 "{prefix} {msg}",
710 )
711 .as_str(),
712 )
713 .unwrap()
714}
715
716async fn get_local_cache_database(git_repo_path: &Path) -> Result<SQLiteDatabase> {
717 SQLiteDatabase::open(git_repo_path.join(".git/nostr-cache.sqlite"))
718 .await
719 .context("cannot open or create nostr cache database at .git/nostr-cache.sqlite")
720}
721
722async fn get_global_cache_database(git_repo_path: &Path) -> Result<SQLiteDatabase> {
723 SQLiteDatabase::open(if std::env::var("NGITTEST").is_err() {
724 create_dir_all(get_dirs()?.cache_dir()).context(format!(
725 "cannot create cache directory in: {:?}",
726 get_dirs()?.cache_dir()
727 ))?;
728 get_dirs()?.cache_dir().join("nostr-cache.sqlite")
729 } else {
730 git_repo_path.join(".git/test-global-cache.sqlite")
731 })
732 .await
733 .context("cannot open ngit global nostr cache database")
734}
735
736pub async fn get_events_from_cache(
737 git_repo_path: &Path,
738 filters: Vec<nostr::Filter>,
739) -> Result<Vec<nostr::Event>> {
740 get_local_cache_database(git_repo_path)
741 .await?
742 .query(filters.clone(), Order::Asc)
743 .await
744 .context(
745 "cannot execute query on opened git repo nostr cache database .git/nostr-cache.sqlite",
746 )
747}
748
749pub async fn get_event_from_global_cache(
750 git_repo_path: &Path,
751 filters: Vec<nostr::Filter>,
752) -> Result<Vec<nostr::Event>> {
753 get_global_cache_database(git_repo_path)
754 .await?
755 .query(filters.clone(), Order::Asc)
756 .await
757 .context("cannot execute query on opened ngit nostr cache database")
758}
759
760pub async fn save_event_in_cache(git_repo_path: &Path, event: &nostr::Event) -> Result<bool> {
761 get_local_cache_database(git_repo_path)
762 .await?
763 .save_event(event)
764 .await
765 .context("cannot save event in local cache")
766}
767
768pub async fn save_event_in_global_cache(
769 git_repo_path: &Path,
770 event: &nostr::Event,
771) -> Result<bool> {
772 get_global_cache_database(git_repo_path)
773 .await?
774 .save_event(event)
775 .await
776 .context("cannot save event in local cache")
777}
778
779pub async fn get_repo_ref_from_cache(
780 git_repo_path: &Path,
781 repo_coordinates: &HashSet<Coordinate>,
782) -> Result<RepoRef> {
783 let mut maintainers = HashSet::new();
784 let mut new_coordinate: bool;
785
786 for c in repo_coordinates {
787 maintainers.insert(c.public_key);
788 }
789 let mut repo_events = vec![];
790 loop {
791 new_coordinate = false;
792 let repo_events_filter = get_filter_repo_events(repo_coordinates);
793
794 let events = [
795 get_event_from_global_cache(git_repo_path, vec![repo_events_filter.clone()]).await?,
796 get_events_from_cache(git_repo_path, vec![repo_events_filter]).await?,
797 ]
798 .concat();
799 for e in events {
800 if let Ok(repo_ref) = RepoRef::try_from(e.clone()) {
801 for m in repo_ref.maintainers {
802 if maintainers.insert(m) {
803 new_coordinate = true;
804 }
805 }
806 repo_events.push(e);
807 }
808 }
809 if !new_coordinate {
810 break;
811 }
812 }
813 repo_events.sort_by_key(|e| e.created_at);
814 let repo_ref = RepoRef::try_from(
815 repo_events
816 .first()
817 .context("no repo events at specified coordinates")?
818 .clone(),
819 )?;
820
821 let mut events: HashMap<Coordinate, nostr::Event> = HashMap::new();
822 for m in &maintainers {
823 if let Some(e) = repo_events.iter().find(|e| e.author().eq(m)) {
824 events.insert(
825 Coordinate {
826 kind: e.kind,
827 identifier: e.identifier().unwrap().to_string(),
828 public_key: e.author(),
829 relays: vec![],
830 },
831 e.clone(),
832 );
833 }
834 }
835
836 Ok(RepoRef {
837 // use all maintainers from all events found, not just maintainers in the most
838 // recent event
839 maintainers: maintainers.iter().copied().collect::<Vec<PublicKey>>(),
840 events,
841 ..repo_ref
842 })
843}
844
845pub async fn get_state_from_cache(git_repo_path: &Path, repo_ref: &RepoRef) -> Result<RepoState> {
846 RepoState::try_from(
847 get_events_from_cache(
848 git_repo_path,
849 vec![get_filter_state_events(&repo_ref.coordinates())],
850 )
851 .await?,
852 )
853}
854
855#[allow(clippy::too_many_lines)]
856async fn create_relays_request(
857 git_repo_path: &Path,
858 repo_coordinates: &HashSet<Coordinate>,
859 user_profiles: &HashSet<PublicKey>,
860 fallback_relays: HashSet<Url>,
861) -> Result<FetchRequest> {
862 let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await;
863
864 let repo_coordinates = {
865 // add coordinates of users listed in maintainers to explicitly specified
866 // coodinates
867 let mut repo_coordinates = repo_coordinates.clone();
868 if let Ok(repo_ref) = &repo_ref {
869 for c in repo_ref.coordinates() {
870 if !repo_coordinates
871 .iter()
872 .any(|e| e.identifier.eq(&c.identifier) && e.public_key.eq(&c.public_key))
873 {
874 repo_coordinates.insert(c);
875 }
876 }
877 }
878 repo_coordinates
879 };
880
881 let repo_coordinates_without_relays = {
882 let mut set = HashSet::new();
883 for c in &repo_coordinates {
884 set.insert(Coordinate {
885 kind: c.kind,
886 identifier: c.identifier.clone(),
887 public_key: c.public_key,
888 relays: vec![],
889 });
890 }
891 set
892 };
893
894 let mut proposals: HashSet<EventId> = HashSet::new();
895 let mut missing_contributor_profiles: HashSet<PublicKey> = HashSet::new();
896 let mut contributors: HashSet<PublicKey> = HashSet::new();
897
898 if !repo_coordinates_without_relays.is_empty() {
899 if let Ok(repo_ref) = &repo_ref {
900 for m in &repo_ref.maintainers {
901 contributors.insert(m.to_owned());
902 }
903 }
904
905 for event in &get_events_from_cache(
906 git_repo_path,
907 vec![
908 nostr::Filter::default()
909 .kinds(vec![Kind::GitPatch])
910 .custom_tag(
911 SingleLetterTag::lowercase(nostr_sdk::Alphabet::A),
912 repo_coordinates_without_relays
913 .iter()
914 .map(std::string::ToString::to_string)
915 .collect::<Vec<String>>(),
916 ),
917 ],
918 )
919 .await?
920 {
921 if event_is_patch_set_root(event) || event_is_revision_root(event) {
922 proposals.insert(event.id());
923 contributors.insert(event.author());
924 }
925 }
926
927 let profile_events = get_event_from_global_cache(
928 git_repo_path,
929 vec![get_filter_contributor_profiles(contributors.clone())],
930 )
931 .await?;
932 for c in &contributors {
933 if let Some(event) = profile_events
934 .iter()
935 .find(|e| e.kind() == Kind::Metadata && e.author().eq(c))
936 {
937 save_event_in_cache(git_repo_path, event).await?;
938 } else {
939 missing_contributor_profiles.insert(c.to_owned());
940 }
941 }
942 }
943
944 let profiles_to_fetch_from_user_relays = {
945 let mut user_profiles = user_profiles.clone();
946 if let Ok(Some(current_user)) = get_logged_in_user(git_repo_path).await {
947 user_profiles.insert(current_user);
948 }
949 let mut map: HashMap<PublicKey, (Timestamp, Timestamp)> = HashMap::new();
950 for public_key in &user_profiles {
951 if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, public_key).await {
952 map.insert(
953 public_key.to_owned(),
954 (user_ref.metadata.created_at, user_ref.relays.created_at),
955 );
956 } else {
957 map.insert(
958 public_key.to_owned(),
959 (Timestamp::from(0), Timestamp::from(0)),
960 );
961 }
962 }
963 map
964 };
965
966 let user_relays_for_profiles = {
967 let mut set = HashSet::new();
968 for user in &profiles_to_fetch_from_user_relays
969 .clone()
970 .into_keys()
971 .collect::<Vec<PublicKey>>()
972 {
973 if let Ok(user_ref) = get_user_ref_from_cache(git_repo_path, user).await {
974 for r in user_ref.relays.write() {
975 if let Ok(url) = Url::parse(&r) {
976 set.insert(url);
977 }
978 }
979 } else {
980 missing_contributor_profiles.insert(user.to_owned());
981 }
982 }
983 set
984 };
985
986 let existing_events: HashSet<EventId> = {
987 let mut existing_events: HashSet<EventId> = HashSet::new();
988 for filter in get_fetch_filters(
989 &repo_coordinates_without_relays,
990 &proposals,
991 &missing_contributor_profiles
992 .union(
993 &profiles_to_fetch_from_user_relays
994 .clone()
995 .into_keys()
996 .collect(),
997 )
998 .copied()
999 .collect(),
1000 ) {
1001 for (id, _) in get_local_cache_database(git_repo_path)
1002 .await?
1003 .negentropy_items(filter)
1004 .await?
1005 {
1006 existing_events.insert(id);
1007 }
1008 }
1009 existing_events
1010 };
1011
1012 let relays = {
1013 let mut relays = fallback_relays;
1014 if let Ok(repo_ref) = &repo_ref {
1015 for r in &repo_ref.relays {
1016 if let Ok(url) = Url::parse(r) {
1017 relays.insert(url);
1018 }
1019 }
1020 }
1021 for c in repo_coordinates {
1022 for r in &c.relays {
1023 if let Ok(url) = Url::parse(r) {
1024 relays.insert(url);
1025 }
1026 }
1027 }
1028 relays
1029 };
1030
1031 let relay_column_width = relays
1032 .union(&user_relays_for_profiles)
1033 .reduce(|a, r| {
1034 if r.to_string()
1035 .chars()
1036 .count()
1037 .gt(&a.to_string().chars().count())
1038 {
1039 r
1040 } else {
1041 a
1042 }
1043 })
1044 .unwrap()
1045 .to_string()
1046 .chars()
1047 .count()
1048 + 2;
1049
1050 Ok(FetchRequest {
1051 selected_relay: None,
1052 repo_relays: relays,
1053 relay_column_width,
1054 repo_coordinates_without_relays: if let Ok(repo_ref) = &repo_ref {
1055 repo_ref.coordinates_with_timestamps()
1056 } else {
1057 repo_coordinates_without_relays
1058 .iter()
1059 .map(|c| (c.clone(), None))
1060 .collect()
1061 },
1062 state: if let Ok(repo_ref) = &repo_ref {
1063 if let Ok(existing_state) = get_state_from_cache(git_repo_path, repo_ref).await {
1064 Some((existing_state.event.created_at, existing_state.event.id))
1065 } else {
1066 None
1067 }
1068 } else {
1069 None
1070 },
1071 proposals,
1072 contributors,
1073 missing_contributor_profiles,
1074 existing_events,
1075 profiles_to_fetch_from_user_relays,
1076 user_relays_for_profiles,
1077 })
1078}
1079
1080#[allow(clippy::too_many_lines)]
1081async fn process_fetched_events(
1082 events: Vec<nostr::Event>,
1083 request: &FetchRequest,
1084 git_repo_path: &Path,
1085 fresh_coordinates: &mut HashSet<Coordinate>,
1086 fresh_proposal_roots: &mut HashSet<EventId>,
1087 fresh_profiles: &mut HashSet<PublicKey>,
1088 report: &mut FetchReport,
1089) -> Result<()> {
1090 for event in &events {
1091 if !request.existing_events.contains(&event.id) {
1092 save_event_in_cache(git_repo_path, event).await?;
1093 if event.kind().eq(&Kind::GitRepoAnnouncement) {
1094 save_event_in_global_cache(git_repo_path, event).await?;
1095 let new_coordinate = !request
1096 .repo_coordinates_without_relays
1097 .iter()
1098 .map(|(c, _)| c.clone())
1099 .any(|c| {
1100 c.identifier.eq(event.identifier().unwrap())
1101 && c.public_key.eq(&event.pubkey)
1102 });
1103 let update_to_existing = !new_coordinate
1104 && request
1105 .repo_coordinates_without_relays
1106 .iter()
1107 .any(|(c, t)| {
1108 c.identifier.eq(event.identifier().unwrap())
1109 && c.public_key.eq(&event.pubkey)
1110 && if let Some(t) = t {
1111 event.created_at.gt(t)
1112 } else {
1113 true
1114 }
1115 });
1116 if update_to_existing {
1117 report.updated_repo_announcements.push((
1118 Coordinate {
1119 kind: event.kind(),
1120 public_key: event.author(),
1121 identifier: event.identifier().unwrap().to_owned(),
1122 relays: vec![],
1123 },
1124 event.created_at,
1125 ));
1126 }
1127 // if contains new maintainer
1128 if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) {
1129 for m in &repo_ref.maintainers {
1130 if !request
1131 .repo_coordinates_without_relays // prexisting maintainers
1132 .iter()
1133 .map(|(c, _)| c.clone())
1134 .collect::<HashSet<Coordinate>>()
1135 .union(&report.repo_coordinates_without_relays) // already added maintainers
1136 .any(|c| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key))
1137 {
1138 let c = Coordinate {
1139 kind: event.kind(),
1140 public_key: *m,
1141 identifier: repo_ref.identifier.clone(),
1142 relays: vec![],
1143 };
1144 fresh_coordinates.insert(c.clone());
1145 report.repo_coordinates_without_relays.insert(c);
1146
1147 if !request.contributors.contains(m)
1148 && !request
1149 .profiles_to_fetch_from_user_relays
1150 .clone()
1151 .into_keys()
1152 .collect::<HashSet<PublicKey>>()
1153 .contains(m)
1154 && !fresh_profiles.contains(m)
1155 {
1156 fresh_profiles.insert(m.to_owned());
1157 }
1158 }
1159 }
1160 }
1161 } else if event.kind().eq(&STATE_KIND) {
1162 let existing_state = if report.updated_state.is_some() {
1163 report.updated_state
1164 } else {
1165 request.state
1166 };
1167 if let Some((timestamp, id)) = existing_state {
1168 if event.created_at.gt(&timestamp)
1169 || (event.created_at.eq(&timestamp) && event.id.gt(&id))
1170 {
1171 report.updated_state = Some((event.created_at, event.id));
1172 }
1173 }
1174 } else if event_is_patch_set_root(event) {
1175 fresh_proposal_roots.insert(event.id);
1176 report.proposals.insert(event.id);
1177 if !request.contributors.contains(&event.author())
1178 && !fresh_profiles.contains(&event.author())
1179 {
1180 fresh_profiles.insert(event.author());
1181 }
1182 } else if [Kind::RelayList, Kind::Metadata].contains(&event.kind()) {
1183 if request
1184 .missing_contributor_profiles
1185 .contains(&event.author())
1186 {
1187 report.contributor_profiles.insert(event.author());
1188 } else if let Some((_, (metadata_timestamp, relay_list_timestamp))) = request
1189 .profiles_to_fetch_from_user_relays
1190 .get_key_value(&event.author())
1191 {
1192 if (Kind::Metadata.eq(&event.kind())
1193 && event.created_at().gt(metadata_timestamp))
1194 || (Kind::RelayList.eq(&event.kind())
1195 && event.created_at().gt(relay_list_timestamp))
1196 {
1197 report.profile_updates.insert(event.author());
1198 }
1199 }
1200 save_event_in_global_cache(git_repo_path, event).await?;
1201 }
1202 }
1203 }
1204 for event in &events {
1205 if !request.existing_events.contains(&event.id)
1206 && !event.event_ids().any(|id| report.proposals.contains(id))
1207 {
1208 if event.kind().eq(&Kind::GitPatch) && !event_is_patch_set_root(event) {
1209 report.commits.insert(event.id);
1210 } else if status_kinds().contains(&event.kind()) {
1211 report.statuses.insert(event.id);
1212 }
1213 }
1214 }
1215 Ok(())
1216}
1217
1218pub fn consolidate_fetch_reports(reports: Vec<Result<FetchReport>>) -> FetchReport {
1219 let mut report = FetchReport::default();
1220 for relay_report in reports.into_iter().flatten() {
1221 for c in relay_report.repo_coordinates_without_relays {
1222 if !report
1223 .repo_coordinates_without_relays
1224 .iter()
1225 .any(|e| e.eq(&c))
1226 {
1227 report.repo_coordinates_without_relays.insert(c);
1228 }
1229 }
1230 for (r, t) in relay_report.updated_repo_announcements {
1231 if let Some(i) = report
1232 .updated_repo_announcements
1233 .iter()
1234 .position(|(e, _)| e.eq(&r))
1235 {
1236 let (_, existing_t) = &report.updated_repo_announcements[i];
1237 if t.gt(existing_t) {
1238 report.updated_repo_announcements[i] = (r, t);
1239 }
1240 } else {
1241 report.updated_repo_announcements.push((r, t));
1242 }
1243 }
1244 if let Some((timestamp, id)) = relay_report.updated_state {
1245 if let Some((existing_timestamp, existing_id)) = report.updated_state {
1246 if timestamp.gt(&existing_timestamp)
1247 || (timestamp.eq(&existing_timestamp) && id.gt(&existing_id))
1248 {
1249 report.updated_state = Some((timestamp, id));
1250 }
1251 } else {
1252 report.updated_state = Some((timestamp, id));
1253 }
1254 }
1255 for c in relay_report.proposals {
1256 report.proposals.insert(c);
1257 }
1258 for c in relay_report.commits {
1259 report.commits.insert(c);
1260 }
1261 for c in relay_report.statuses {
1262 report.statuses.insert(c);
1263 }
1264 for c in relay_report.contributor_profiles {
1265 report.contributor_profiles.insert(c);
1266 }
1267 for c in relay_report.profile_updates {
1268 report.profile_updates.insert(c);
1269 }
1270 }
1271 report
1272}
1273pub fn get_fetch_filters(
1274 repo_coordinates: &HashSet<Coordinate>,
1275 proposal_ids: &HashSet<EventId>,
1276 required_profiles: &HashSet<PublicKey>,
1277) -> Vec<nostr::Filter> {
1278 [
1279 if repo_coordinates.is_empty() {
1280 vec![]
1281 } else {
1282 vec![
1283 get_filter_state_events(repo_coordinates),
1284 get_filter_repo_events(repo_coordinates),
1285 nostr::Filter::default()
1286 .kinds(vec![Kind::GitPatch, Kind::EventDeletion])
1287 .custom_tag(
1288 SingleLetterTag::lowercase(nostr_sdk::Alphabet::A),
1289 repo_coordinates
1290 .iter()
1291 .map(std::string::ToString::to_string)
1292 .collect::<Vec<String>>(),
1293 ),
1294 ]
1295 },
1296 if proposal_ids.is_empty() {
1297 vec![]
1298 } else {
1299 vec![
1300 nostr::Filter::default()
1301 .events(proposal_ids.clone())
1302 .kinds([vec![Kind::GitPatch, Kind::EventDeletion], status_kinds()].concat()),
1303 ]
1304 },
1305 if required_profiles.is_empty() {
1306 vec![]
1307 } else {
1308 vec![get_filter_contributor_profiles(required_profiles.clone())]
1309 },
1310 ]
1311 .concat()
1312}
1313
1314pub fn get_filter_repo_events(repo_coordinates: &HashSet<Coordinate>) -> nostr::Filter {
1315 nostr::Filter::default()
1316 .kind(Kind::GitRepoAnnouncement)
1317 .identifiers(
1318 repo_coordinates
1319 .iter()
1320 .map(|c| c.identifier.clone())
1321 .collect::<Vec<String>>(),
1322 )
1323 .authors(
1324 repo_coordinates
1325 .iter()
1326 .map(|c| c.public_key)
1327 .collect::<Vec<PublicKey>>(),
1328 )
1329}
1330
1331pub static STATE_KIND: nostr::Kind = Kind::Custom(30618);
1332pub fn get_filter_state_events(repo_coordinates: &HashSet<Coordinate>) -> nostr::Filter {
1333 nostr::Filter::default()
1334 .kind(STATE_KIND)
1335 .identifiers(
1336 repo_coordinates
1337 .iter()
1338 .map(|c| c.identifier.clone())
1339 .collect::<Vec<String>>(),
1340 )
1341 .authors(
1342 repo_coordinates
1343 .iter()
1344 .map(|c| c.public_key)
1345 .collect::<Vec<PublicKey>>(),
1346 )
1347}
1348
1349pub fn get_filter_contributor_profiles(contributors: HashSet<PublicKey>) -> nostr::Filter {
1350 nostr::Filter::default()
1351 .kinds(vec![Kind::Metadata, Kind::RelayList])
1352 .authors(contributors)
1353}
1354
1355#[derive(Default)]
1356pub struct FetchReport {
1357 repo_coordinates_without_relays: HashSet<Coordinate>,
1358 updated_repo_announcements: Vec<(Coordinate, Timestamp)>,
1359 updated_state: Option<(Timestamp, EventId)>,
1360 proposals: HashSet<EventId>,
1361 /// commits against existing propoals
1362 commits: HashSet<EventId>,
1363 statuses: HashSet<EventId>,
1364 contributor_profiles: HashSet<PublicKey>,
1365 profile_updates: HashSet<PublicKey>,
1366}
1367
1368impl Display for FetchReport {
1369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1370 // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses"
1371 let mut display_items: Vec<String> = vec![];
1372 if !self.repo_coordinates_without_relays.is_empty() {
1373 display_items.push(format!(
1374 "{} new maintainer{}",
1375 self.repo_coordinates_without_relays.len(),
1376 if self.repo_coordinates_without_relays.len() > 1 {
1377 "s"
1378 } else {
1379 ""
1380 },
1381 ));
1382 }
1383 if !self.updated_repo_announcements.is_empty() {
1384 display_items.push(format!(
1385 "{} announcement update{}",
1386 self.updated_repo_announcements.len(),
1387 if self.updated_repo_announcements.len() > 1 {
1388 "s"
1389 } else {
1390 ""
1391 },
1392 ));
1393 }
1394 if self.updated_state.is_some() {
1395 display_items.push("new state".to_string());
1396 }
1397 if !self.proposals.is_empty() {
1398 display_items.push(format!(
1399 "{} proposal{}",
1400 self.proposals.len(),
1401 if self.proposals.len() > 1 { "s" } else { "" },
1402 ));
1403 }
1404 if !self.commits.is_empty() {
1405 display_items.push(format!(
1406 "{} commit{}",
1407 self.commits.len(),
1408 if self.commits.len() > 1 { "s" } else { "" },
1409 ));
1410 }
1411 if !self.statuses.is_empty() {
1412 display_items.push(format!(
1413 "{} status{}",
1414 self.statuses.len(),
1415 if self.statuses.len() > 1 { "es" } else { "" },
1416 ));
1417 }
1418 if !self.contributor_profiles.is_empty() {
1419 display_items.push(format!(
1420 "{} user profile{}",
1421 self.contributor_profiles.len(),
1422 if self.contributor_profiles.len() > 1 {
1423 "s"
1424 } else {
1425 ""
1426 },
1427 ));
1428 }
1429 if !self.profile_updates.is_empty() {
1430 display_items.push(format!(
1431 "{} profile update{}",
1432 self.profile_updates.len(),
1433 if self.profile_updates.len() > 1 {
1434 "s"
1435 } else {
1436 ""
1437 },
1438 ));
1439 }
1440 write!(f, "{}", display_items.join(", "))
1441 }
1442}
1443
1444#[derive(Default, Clone)]
1445pub struct FetchRequest {
1446 repo_relays: HashSet<Url>,
1447 selected_relay: Option<Url>,
1448 relay_column_width: usize,
1449 repo_coordinates_without_relays: Vec<(Coordinate, Option<Timestamp>)>,
1450 state: Option<(Timestamp, EventId)>,
1451 proposals: HashSet<EventId>,
1452 contributors: HashSet<PublicKey>,
1453 missing_contributor_profiles: HashSet<PublicKey>,
1454 existing_events: HashSet<EventId>,
1455 profiles_to_fetch_from_user_relays: HashMap<PublicKey, (Timestamp, Timestamp)>,
1456 user_relays_for_profiles: HashSet<Url>,
1457}
1458
1459pub async fn fetching_with_report(
1460 git_repo_path: &Path,
1461 #[cfg(test)] client: &crate::client::MockConnect,
1462 #[cfg(not(test))] client: &Client,
1463 repo_coordinates: &HashSet<Coordinate>,
1464) -> Result<FetchReport> {
1465 let term = console::Term::stderr();
1466 term.write_line("fetching updates...")?;
1467 let (relay_reports, progress_reporter) = client
1468 .fetch_all(git_repo_path, repo_coordinates, &HashSet::new())
1469 .await?;
1470 if !relay_reports.iter().any(std::result::Result::is_err) {
1471 let _ = progress_reporter.clear();
1472 }
1473 let report = consolidate_fetch_reports(relay_reports);
1474 if report.to_string().is_empty() {
1475 println!("no updates");
1476 } else {
1477 println!("updates: {report}");
1478 }
1479 Ok(report)
1480}