upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs883
1 files changed, 833 insertions, 50 deletions
diff --git a/src/client.rs b/src/client.rs
index a66adac..4054e7c 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -10,16 +10,37 @@
10// which is currently in nightly. alternatively we can use nightly as it looks 10// which is currently in nightly. alternatively we can use nightly as it looks
11// certain that the implementation is going to make it to stable but we don't 11// certain that the implementation is going to make it to stable but we don't
12// want to inadvertlty use other features of nightly that might be removed. 12// want to inadvertlty use other features of nightly that might be removed.
13use std::{fmt::Write, time::Duration}; 13use std::{
14 collections::{HashMap, HashSet},
15 fmt::{Display, Write},
16 fs::create_dir_all,
17 path::Path,
18 time::Duration,
19};
14 20
15use anyhow::{bail, Context, Result}; 21use anyhow::{bail, Context, Result};
16use async_trait::async_trait; 22use async_trait::async_trait;
23use console::Style;
17use futures::stream::{self, StreamExt}; 24use futures::stream::{self, StreamExt};
18use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; 25use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
19#[cfg(test)] 26#[cfg(test)]
20use mockall::*; 27use mockall::*;
21use nostr::Event; 28use nostr::{nips::nip01::Coordinate, Event};
22use nostr_sdk::{prelude::RelayLimits, EventBuilder, NostrSigner, Options}; 29use nostr_database::{NostrDatabase, Order};
30use nostr_sdk::{
31 prelude::RelayLimits, EventBuilder, EventId, Kind, NostrSigner, Options, PublicKey,
32 SingleLetterTag, Timestamp, Url,
33};
34use nostr_sqlite::SQLiteDatabase;
35
36use crate::{
37 config::get_dirs,
38 repo_ref::{RepoRef, REPO_REF_KIND},
39 sub_commands::{
40 list::status_kinds,
41 send::{event_is_patch_set_root, PATCH_KIND},
42 },
43};
23 44
24#[allow(clippy::struct_field_names)] 45#[allow(clippy::struct_field_names)]
25pub struct Client { 46pub struct Client {
@@ -35,6 +56,7 @@ pub trait Connect {
35 fn default() -> Self; 56 fn default() -> Self;
36 fn new(opts: Params) -> Self; 57 fn new(opts: Params) -> Self;
37 async fn set_signer(&mut self, signer: NostrSigner); 58 async fn set_signer(&mut self, signer: NostrSigner);
59 async fn connect(&self, relay_url: &Url) -> Result<()>;
38 async fn disconnect(&self) -> Result<()>; 60 async fn disconnect(&self) -> Result<()>;
39 fn get_fallback_relays(&self) -> &Vec<String>; 61 fn get_fallback_relays(&self) -> &Vec<String>;
40 fn get_more_fallback_relays(&self) -> &Vec<String>; 62 fn get_more_fallback_relays(&self) -> &Vec<String>;
@@ -45,6 +67,25 @@ pub trait Connect {
45 relays: Vec<String>, 67 relays: Vec<String>,
46 filters: Vec<nostr::Filter>, 68 filters: Vec<nostr::Filter>,
47 ) -> Result<Vec<nostr::Event>>; 69 ) -> Result<Vec<nostr::Event>>;
70 async fn get_events_per_relay(
71 &self,
72 relays: Vec<Url>,
73 filters: Vec<nostr::Filter>,
74 progress_reporter: MultiProgress,
75 ) -> Result<(Vec<Result<Vec<nostr::Event>>>, MultiProgress)>;
76 async fn fetch_all(
77 &self,
78 git_repo_path: &Path,
79 repo_coordinates: &HashSet<Coordinate>,
80 ) -> Result<FetchReport>;
81 async fn fetch_all_from_relay(
82 &self,
83 git_repo_path: &Path,
84 relay_url: Url,
85 request: FetchRequest,
86 // progress_reporter: &MultiProgress,
87 pb: &Option<ProgressBar>,
88 ) -> Result<FetchReport>;
48} 89}
49 90
50#[async_trait] 91#[async_trait]
@@ -110,6 +151,27 @@ impl Connect for Client {
110 self.client.set_signer(Some(signer)).await; 151 self.client.set_signer(Some(signer)).await;
111 } 152 }
112 153
154 async fn connect(&self, relay_url: &Url) -> Result<()> {
155 self.client
156 .add_relay(relay_url)
157 .await
158 .context("cannot add relay")?;
159
160 let relay = self.client.relay(relay_url).await?;
161
162 if !relay.is_connected().await {
163 #[allow(clippy::large_futures)]
164 relay
165 .connect(Some(std::time::Duration::from_secs(CONNECTION_TIMEOUT)))
166 .await;
167 }
168
169 if !relay.is_connected().await {
170 bail!("connection timeout");
171 }
172 Ok(())
173 }
174
113 async fn disconnect(&self) -> Result<()> { 175 async fn disconnect(&self) -> Result<()> {
114 self.client.disconnect().await?; 176 self.client.disconnect().await?;
115 Ok(()) 177 Ok(())
@@ -139,6 +201,22 @@ impl Connect for Client {
139 relays: Vec<String>, 201 relays: Vec<String>,
140 filters: Vec<nostr::Filter>, 202 filters: Vec<nostr::Filter>,
141 ) -> Result<Vec<nostr::Event>> { 203 ) -> Result<Vec<nostr::Event>> {
204 let (relay_results, _) = self
205 .get_events_per_relay(
206 relays.iter().map(|r| Url::parse(r).unwrap()).collect(),
207 filters,
208 MultiProgress::new(),
209 )
210 .await?;
211 Ok(get_dedup_events(relay_results))
212 }
213
214 async fn get_events_per_relay(
215 &self,
216 relays: Vec<Url>,
217 filters: Vec<nostr::Filter>,
218 progress_reporter: MultiProgress,
219 ) -> Result<(Vec<Result<Vec<nostr::Event>>>, MultiProgress)> {
142 // add relays 220 // add relays
143 for relay in &relays { 221 for relay in &relays {
144 self.client 222 self.client
@@ -147,59 +225,20 @@ impl Connect for Client {
147 .context("cannot add relay")?; 225 .context("cannot add relay")?;
148 } 226 }
149 227
150 let m = MultiProgress::new();
151 let pb_style = ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?
152 .with_key("timeout_in", |state: &ProgressState, w: &mut dyn Write| {
153 if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT {
154 write!(
155 w,
156 "timeout in {:.1}s",
157 GET_EVENTS_TIMEOUT - state.elapsed().as_secs()
158 )
159 .unwrap();
160 }
161 });
162
163 let pb_after_style = |succeed| {
164 ProgressStyle::with_template(
165 format!(
166 " {} {}",
167 if succeed {
168 console::style("✔".to_string())
169 .for_stderr()
170 .green()
171 .to_string()
172 } else {
173 console::style("✘".to_string())
174 .for_stderr()
175 .red()
176 .to_string()
177 },
178 "{prefix} {msg}",
179 )
180 .as_str(),
181 )
182 };
183
184 let relays_map = self.client.relays().await; 228 let relays_map = self.client.relays().await;
185 229
186 let futures: Vec<_> = relays 230 let futures: Vec<_> = relays
187 .clone() 231 .clone()
188 .iter() 232 .iter()
189 // don't look for events on blaster 233 // don't look for events on blaster
190 .filter(|r| !r.contains("nostr.mutinywallet.com")) 234 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
191 .map(|r| { 235 .map(|r| (relays_map.get(r).unwrap(), filters.clone()))
192 (
193 relays_map.get(&nostr::Url::parse(r).unwrap()).unwrap(),
194 filters.clone(),
195 )
196 })
197 .map(|(relay, filters)| async { 236 .map(|(relay, filters)| async {
198 let pb = if std::env::var("NGITTEST").is_err() { 237 let pb = if std::env::var("NGITTEST").is_err() {
199 let pb = m.add( 238 let pb = progress_reporter.add(
200 ProgressBar::new(1) 239 ProgressBar::new(1)
201 .with_prefix(format!("{: <11}{}", "connecting", relay.url())) 240 .with_prefix(format!("{: <11}{}", "connecting", relay.url()))
202 .with_style(pb_style.clone()), 241 .with_style(pb_style()?),
203 ); 242 );
204 pb.enable_steady_tick(Duration::from_millis(300)); 243 pb.enable_steady_tick(Duration::from_millis(300));
205 Some(pb) 244 Some(pb)
@@ -210,7 +249,7 @@ impl Connect for Client {
210 match get_events_of(relay, filters, &pb).await { 249 match get_events_of(relay, filters, &pb).await {
211 Err(error) => { 250 Err(error) => {
212 if let Some(pb) = pb { 251 if let Some(pb) = pb {
213 pb.set_style(pb_after_style(false)?); 252 pb.set_style(pb_after_style(false));
214 pb.set_prefix(format!("{: <11}{}", "error", relay.url())); 253 pb.set_prefix(format!("{: <11}{}", "error", relay.url()));
215 pb.finish_with_message( 254 pb.finish_with_message(
216 console::style( 255 console::style(
@@ -225,7 +264,7 @@ impl Connect for Client {
225 } 264 }
226 Ok(res) => { 265 Ok(res) => {
227 if let Some(pb) = pb { 266 if let Some(pb) = pb {
228 pb.set_style(pb_after_style(true)?); 267 pb.set_style(pb_after_style(true));
229 pb.set_prefix(format!( 268 pb.set_prefix(format!(
230 "{: <11}{}", 269 "{: <11}{}",
231 format!("{} events", res.len()), 270 format!("{} events", res.len()),
@@ -239,9 +278,228 @@ impl Connect for Client {
239 }) 278 })
240 .collect(); 279 .collect();
241 280
242 let relay_results = stream::iter(futures).buffer_unordered(15).collect().await; 281 let relay_results: Vec<Result<Vec<nostr::Event>>> =
282 stream::iter(futures).buffer_unordered(15).collect().await;
243 283
244 Ok(get_dedup_events(relay_results)) 284 Ok((relay_results, progress_reporter))
285 }
286
287 #[allow(clippy::too_many_lines)]
288 async fn fetch_all(
289 &self,
290 git_repo_path: &Path,
291 repo_coordinates: &HashSet<Coordinate>,
292 ) -> Result<FetchReport> {
293 println!("fetching updates...");
294 let mut fallback_relays = HashSet::new();
295 for r in &self.fallback_relays {
296 if let Ok(url) = Url::parse(r) {
297 fallback_relays.insert(url);
298 }
299 }
300 let (relays, request) =
301 create_relays_request(git_repo_path, repo_coordinates, fallback_relays).await?;
302 let progress_reporter = MultiProgress::new();
303
304 for relay in &relays {
305 self.client
306 .add_relay(relay.as_str())
307 .await
308 .context("cannot add relay")?;
309 }
310
311 let dim = Style::new().color256(247);
312
313 let futures: Vec<_> = relays
314 .iter()
315 // don't look for events on blaster
316 .filter(|r| !r.as_str().contains("nostr.mutinywallet.com"))
317 .map(|r| (r.clone(), request.clone()))
318 .map(|(relay, request)| async {
319 let relay_column_width = request.relay_column_width;
320
321 let pb = if std::env::var("NGITTEST").is_err() {
322 let pb = progress_reporter.add(
323 ProgressBar::new(1)
324 .with_prefix(
325 dim.apply_to(format!(
326 "{: <relay_column_width$}{}",
327 "connecting", &relay
328 ))
329 .to_string(),
330 )
331 .with_style(pb_style()?),
332 );
333 pb.enable_steady_tick(Duration::from_millis(300));
334 Some(pb)
335 } else {
336 None
337 };
338
339 #[allow(clippy::large_futures)]
340 match self
341 .fetch_all_from_relay(git_repo_path, relay, request, &pb)
342 .await
343 {
344 Err(error) => {
345 if let Some(pb) = pb {
346 pb.set_style(pb_after_style(false));
347 pb.set_prefix(
348 dim.apply_to(format!(
349 "{: <relay_column_width$}{}",
350 "error", "&relay"
351 ))
352 .to_string(),
353 );
354 pb.finish_with_message(
355 console::style(
356 error.to_string().replace("relay pool error:", "error:"),
357 )
358 .for_stderr()
359 .red()
360 .to_string(),
361 );
362 }
363 Err(error)
364 }
365 Ok(res) => {
366 if let Some(pb) = pb {
367 pb.set_style(pb_after_style(true));
368 pb.set_prefix(
369 dim.apply_to(format!(
370 "{: <relay_column_width$}{}",
371 if let Some(relay) = &res.relay {
372 format!("{relay}")
373 } else {
374 String::new()
375 },
376 if res.to_string().is_empty() {
377 "no updates".to_string()
378 } else {
379 format!("found {res}")
380 },
381 ))
382 .to_string(),
383 );
384 pb.finish_with_message("");
385 }
386 Ok(res)
387 }
388 }
389 })
390 .collect();
391
392 let relay_reports: Vec<Result<FetchReport>> =
393 stream::iter(futures).buffer_unordered(15).collect().await;
394
395 let report = consolidate_fetch_reports(relay_reports);
396
397 if report.to_string().is_empty() {
398 println!("no updates found");
399 } else {
400 println!("fetched updates: {report}");
401 }
402 Ok(report)
403 }
404
405 async fn fetch_all_from_relay(
406 &self,
407 git_repo_path: &Path,
408 relay_url: Url,
409 request: FetchRequest,
410 // progress_reporter: &MultiProgress,
411 pb: &Option<ProgressBar>,
412 ) -> Result<FetchReport> {
413 let mut fresh_coordinates: HashSet<Coordinate> = HashSet::new();
414 for (c, _) in request.repo_coordinates.clone() {
415 fresh_coordinates.insert(c);
416 }
417 let mut fresh_proposal_roots = request.proposals.clone();
418 let mut fresh_authors = request.contributor_profiles.clone();
419
420 let mut report = FetchReport {
421 relay: Some(relay_url.clone()),
422 ..Default::default()
423 };
424
425 // let pb = if std::env::var("NGITTEST").is_err() {
426 // let pb = progress_reporter.add(
427 // ProgressBar::new(1)
428 // .with_prefix(format!("{: <11}{}", "connecting", relay_url))
429 // .with_style(pb_style()?),
430 // );
431 // pb.enable_steady_tick(Duration::from_millis(300));
432 // Some(pb)
433 // } else {
434 // None
435 // };
436
437 self.connect(&relay_url).await?;
438
439 let relay_column_width = request.relay_column_width;
440
441 let dim = Style::new().color256(247);
442
443 loop {
444 let filters =
445 get_fetch_filters(&fresh_coordinates, &fresh_proposal_roots, &fresh_authors);
446
447 if let Some(pb) = &pb {
448 pb.set_prefix(
449 dim.apply_to(format!(
450 "{: <relay_column_width$}{}",
451 &relay_url,
452 if report.to_string().is_empty() {
453 "fetching...".to_string()
454 } else {
455 format!("found {report}")
456 },
457 ))
458 .to_string(),
459 );
460 }
461
462 fresh_coordinates = HashSet::new();
463 fresh_proposal_roots = HashSet::new();
464 fresh_authors = HashSet::new();
465
466 let relay = self.client.relay(&relay_url).await?;
467 let events: Vec<nostr::Event> = get_events_of(&relay, filters, &None).await?;
468 // TODO: try reconcile
469
470 for event in events {
471 // TODO existing_events or events in fresh
472 process_fetched_event(
473 event,
474 &request,
475 git_repo_path,
476 &mut fresh_coordinates,
477 &mut fresh_proposal_roots,
478 &mut report,
479 )
480 .await?;
481 }
482
483 if fresh_coordinates.is_empty() && fresh_proposal_roots.is_empty() {
484 break;
485 }
486 }
487 if let Some(pb) = pb {
488 let report_display = format!("{report}");
489 pb.set_prefix(
490 dim.apply_to(format!(
491 "{: <relay_column_width$}{}",
492 relay_url,
493 if report_display.is_empty() {
494 String::new()
495 } else {
496 format!("found {report_display}")
497 },
498 ))
499 .to_string(),
500 );
501 }
502 Ok(report)
245 } 503 }
246} 504}
247 505
@@ -253,6 +511,8 @@ async fn get_events_of(
253 filters: Vec<nostr::Filter>, 511 filters: Vec<nostr::Filter>,
254 pb: &Option<ProgressBar>, 512 pb: &Option<ProgressBar>,
255) -> Result<Vec<Event>> { 513) -> Result<Vec<Event>> {
514 // relay.reconcile(filter, opts).await?;
515
256 if !relay.is_connected().await { 516 if !relay.is_connected().await {
257 #[allow(clippy::large_futures)] 517 #[allow(clippy::large_futures)]
258 relay 518 relay
@@ -324,3 +584,526 @@ pub async fn fetch_public_key(signer: &NostrSigner) -> Result<nostr::PublicKey>
324 term.clear_last_lines(1)?; 584 term.clear_last_lines(1)?;
325 Ok(public_key) 585 Ok(public_key)
326} 586}
587
588fn pb_style() -> Result<ProgressStyle> {
589 Ok(
590 ProgressStyle::with_template(" {spinner} {prefix} {msg} {timeout_in}")?.with_key(
591 "timeout_in",
592 |state: &ProgressState, w: &mut dyn Write| {
593 if state.elapsed().as_secs() > 3 && state.elapsed().as_secs() < GET_EVENTS_TIMEOUT {
594 let dim = Style::new().color256(247);
595 write!(
596 w,
597 "{}",
598 dim.apply_to(format!(
599 "timeout in {:.1}s",
600 GET_EVENTS_TIMEOUT - state.elapsed().as_secs()
601 ))
602 )
603 .unwrap();
604 }
605 },
606 ),
607 )
608}
609
610fn pb_after_style(succeed: bool) -> indicatif::ProgressStyle {
611 ProgressStyle::with_template(
612 format!(
613 " {} {}",
614 if succeed {
615 console::style("✔".to_string())
616 .for_stderr()
617 .green()
618 .to_string()
619 } else {
620 console::style("✘".to_string())
621 .for_stderr()
622 .red()
623 .to_string()
624 },
625 "{prefix} {msg}",
626 )
627 .as_str(),
628 )
629 .unwrap()
630}
631
632async fn get_local_cache_database(git_repo_path: &Path) -> Result<SQLiteDatabase> {
633 SQLiteDatabase::open(git_repo_path.join(".git/nostr-cache.sqlite"))
634 .await
635 .context("cannot open or create nostr cache database at .git/nostr-cache.sqlite")
636}
637
638async fn get_global_cache_database(git_repo_path: &Path) -> Result<SQLiteDatabase> {
639 SQLiteDatabase::open(if std::env::var("NGITTEST").is_err() {
640 create_dir_all(get_dirs()?.config_dir()).context(format!(
641 "cannot create cache directory in: {:?}",
642 get_dirs()?.config_dir()
643 ))?;
644 get_dirs()?.config_dir().join("cache.sqlite")
645 } else {
646 git_repo_path.join(".git/test-global-cache.sqlite")
647 })
648 .await
649 .context("cannot open ngit global nostr cache database")
650}
651
652pub async fn get_event_from_cache(
653 git_repo_path: &Path,
654 filters: Vec<nostr::Filter>,
655) -> Result<Vec<nostr::Event>> {
656 get_local_cache_database(git_repo_path)
657 .await?
658 .query(filters.clone(), Order::Asc)
659 .await
660 .context(
661 "cannot execute query on opened git repo nostr cache database .git/nostr-cache.sqlite",
662 )
663}
664
665pub async fn get_event_from_global_cache(
666 git_repo_path: &Path,
667 filters: Vec<nostr::Filter>,
668) -> Result<Vec<nostr::Event>> {
669 get_global_cache_database(git_repo_path)
670 .await?
671 .query(filters.clone(), Order::Asc)
672 .await
673 .context("cannot execute query on opened ngit nostr cache database")
674}
675
676pub async fn save_event_in_cache(git_repo_path: &Path, event: &nostr::Event) -> Result<bool> {
677 get_local_cache_database(git_repo_path)
678 .await?
679 .save_event(event)
680 .await
681 .context("cannot save event in local cache")
682}
683
684pub async fn save_event_in_global_cache(
685 git_repo_path: &Path,
686 event: &nostr::Event,
687) -> Result<bool> {
688 get_global_cache_database(git_repo_path)
689 .await?
690 .save_event(event)
691 .await
692 .context("cannot save event in local cache")
693}
694
695pub async fn get_repo_ref_from_cache(
696 git_repo_path: &Path,
697 repo_coordinates: &HashSet<Coordinate>,
698) -> Result<RepoRef> {
699 let mut maintainers = HashSet::new();
700 let mut new_coordinate = false;
701
702 for c in repo_coordinates {
703 maintainers.insert(c.public_key);
704 }
705 let mut repo_events = vec![];
706 loop {
707 let filter = get_filter_repo_events(repo_coordinates);
708
709 let events = [
710 get_event_from_global_cache(git_repo_path, vec![filter.clone()]).await?,
711 get_event_from_cache(git_repo_path, vec![filter]).await?,
712 ]
713 .concat();
714 for e in events {
715 if let Ok(repo_ref) = RepoRef::try_from(e.clone()) {
716 for m in repo_ref.maintainers {
717 if maintainers.insert(m) {
718 new_coordinate = true;
719 }
720 }
721 repo_events.push(e);
722 }
723 }
724 if !new_coordinate {
725 break;
726 }
727 }
728 repo_events.sort_by_key(|e| e.created_at);
729 let repo_ref = RepoRef::try_from(
730 repo_events
731 .first()
732 .context("no repo events at specified coordinates")?
733 .clone(),
734 )?;
735
736 let mut events: HashMap<Coordinate, nostr::Event> = HashMap::new();
737 for m in &maintainers {
738 if let Some(e) = repo_events.iter().find(|e| e.author().eq(m)) {
739 events.insert(
740 Coordinate {
741 kind: e.kind,
742 identifier: e.identifier().unwrap().to_string(),
743 public_key: e.author(),
744 relays: vec![],
745 },
746 e.clone(),
747 );
748 }
749 }
750
751 Ok(RepoRef {
752 // use all maintainers from all events found, not just maintainers in the most
753 // recent event
754 maintainers: maintainers.iter().copied().collect::<Vec<PublicKey>>(),
755 events,
756 ..repo_ref
757 })
758}
759
760async fn create_relays_request(
761 git_repo_path: &Path,
762 repo_coordinates: &HashSet<Coordinate>,
763 fallback_relays: HashSet<Url>,
764) -> Result<(HashSet<Url>, FetchRequest)> {
765 let repo_ref = get_repo_ref_from_cache(git_repo_path, repo_coordinates).await;
766
767 let relays = {
768 let mut relays = fallback_relays;
769 if let Ok(repo_ref) = &repo_ref {
770 for r in &repo_ref.relays {
771 if let Ok(url) = Url::parse(r) {
772 relays.insert(url);
773 }
774 }
775 }
776 relays
777 };
778
779 let relay_column_width = relays
780 .iter()
781 .reduce(|a, r| {
782 if r.to_string()
783 .chars()
784 .count()
785 .gt(&a.to_string().chars().count())
786 {
787 r
788 } else {
789 a
790 }
791 })
792 .unwrap()
793 .to_string()
794 .chars()
795 .count()
796 + 2;
797
798 let repo_coordinates = if let Ok(repo_ref) = &repo_ref {
799 repo_ref.coordinates()
800 } else {
801 repo_coordinates.clone()
802 };
803
804 let proposals: HashSet<EventId> = get_local_cache_database(git_repo_path)
805 .await?
806 .negentropy_items(
807 nostr::Filter::default()
808 .kinds(vec![Kind::Custom(PATCH_KIND)])
809 .custom_tag(
810 SingleLetterTag::lowercase(nostr_sdk::Alphabet::A),
811 repo_coordinates
812 .iter()
813 .map(std::string::ToString::to_string)
814 .collect::<Vec<String>>(),
815 ),
816 )
817 .await?
818 .iter()
819 .map(|(id, _)| *id)
820 .collect();
821
822 let contributor_profiles = HashSet::new();
823
824 let existing_events: HashSet<EventId> = {
825 let mut existing_events: HashSet<EventId> = HashSet::new();
826 for filter in get_fetch_filters(&repo_coordinates, &proposals, &contributor_profiles) {
827 for (id, _) in get_local_cache_database(git_repo_path)
828 .await?
829 .negentropy_items(filter)
830 .await?
831 {
832 existing_events.insert(id);
833 }
834 }
835 existing_events
836 };
837 Ok((
838 relays,
839 FetchRequest {
840 relay_column_width,
841 repo_coordinates: if let Ok(repo_ref) = repo_ref {
842 repo_ref.coordinates_with_timestamps()
843 } else {
844 repo_coordinates.iter().map(|c| (c.clone(), None)).collect()
845 },
846 proposals,
847 contributor_profiles,
848 existing_events,
849 },
850 ))
851}
852
853async fn process_fetched_event(
854 event: nostr::Event,
855 request: &FetchRequest,
856 git_repo_path: &Path,
857 fresh_coordinates: &mut HashSet<Coordinate>,
858 fresh_proposal_roots: &mut HashSet<EventId>,
859 report: &mut FetchReport,
860) -> Result<()> {
861 if !request.existing_events.contains(&event.id) {
862 save_event_in_cache(git_repo_path, &event).await?;
863 if event.kind().as_u16().eq(&REPO_REF_KIND) {
864 save_event_in_global_cache(git_repo_path, &event).await?;
865 let new_coordinate = !request.repo_coordinates.iter().any(|(c, _)| {
866 c.identifier.eq(event.identifier().unwrap()) && c.public_key.eq(&event.pubkey)
867 });
868 let update_to_existing = !new_coordinate
869 && request.repo_coordinates.iter().any(|(c, t)| {
870 c.identifier.eq(event.identifier().unwrap())
871 && c.public_key.eq(&event.pubkey)
872 && if let Some(t) = t {
873 event.created_at.gt(t)
874 } else {
875 false
876 }
877 });
878 if new_coordinate || update_to_existing {
879 let c = Coordinate {
880 kind: event.kind(),
881 public_key: event.author(),
882 identifier: event.identifier().unwrap().to_string(),
883 relays: vec![],
884 };
885 if new_coordinate {
886 fresh_coordinates.insert(c.clone());
887 report.repo_coordinates.push(c.clone());
888 }
889 if update_to_existing {
890 report
891 .updated_repo_announcements
892 .push((c, event.created_at));
893 }
894 }
895 // if contains new maintainer
896 if let Ok(repo_ref) = &RepoRef::try_from(event.clone()) {
897 for m in &repo_ref.maintainers {
898 if !request
899 .repo_coordinates
900 .iter()
901 .any(|(c, _)| c.identifier.eq(&repo_ref.identifier) && m.eq(&c.public_key))
902 {
903 fresh_coordinates.insert(Coordinate {
904 kind: event.kind(),
905 public_key: *m,
906 identifier: repo_ref.identifier.clone(),
907 relays: vec![],
908 });
909 }
910 }
911 }
912 } else if event_is_patch_set_root(&event) {
913 fresh_proposal_roots.insert(event.id);
914 report.proposals.insert(event.id);
915 } else if !event.event_ids().any(|id| report.proposals.contains(id)) {
916 if event.kind().as_u16() == PATCH_KIND {
917 report.commits.insert(event.id);
918 } else if status_kinds().contains(&event.kind()) {
919 report.statuses.insert(event.id);
920 }
921 } else if event.kind().eq(&nostr_sdk::Kind::Metadata) {
922 report.contributor_profiles.insert(event.author());
923 save_event_in_global_cache(git_repo_path, &event).await?;
924 }
925 }
926 Ok(())
927}
928
929fn consolidate_fetch_reports(reports: Vec<Result<FetchReport>>) -> FetchReport {
930 let mut report = FetchReport::default();
931 for relay_report in reports.into_iter().flatten() {
932 for c in relay_report.repo_coordinates {
933 if !report.repo_coordinates.iter().any(|e| e.eq(&c)) {
934 report.repo_coordinates.push(c);
935 }
936 }
937 for (r, t) in relay_report.updated_repo_announcements {
938 if let Some(i) = report
939 .updated_repo_announcements
940 .iter()
941 .position(|(e, _)| e.eq(&r))
942 {
943 let (_, existing_t) = &report.updated_repo_announcements[i];
944 if t.gt(existing_t) {
945 report.updated_repo_announcements[i] = (r, t);
946 }
947 } else {
948 report.updated_repo_announcements.push((r, t));
949 }
950 }
951 for c in relay_report.proposals {
952 report.proposals.insert(c);
953 }
954 for c in relay_report.commits {
955 report.commits.insert(c);
956 }
957 for c in relay_report.statuses {
958 report.statuses.insert(c);
959 }
960 }
961 report
962}
963pub fn get_fetch_filters(
964 repo_coordinates: &HashSet<Coordinate>,
965 proposal_ids: &HashSet<EventId>,
966 required_profiles: &HashSet<PublicKey>,
967) -> Vec<nostr::Filter> {
968 [
969 if repo_coordinates.is_empty() {
970 vec![]
971 } else {
972 vec![
973 get_filter_repo_events(repo_coordinates),
974 nostr::Filter::default()
975 .kinds(vec![Kind::Custom(PATCH_KIND), Kind::EventDeletion])
976 .custom_tag(
977 SingleLetterTag::lowercase(nostr_sdk::Alphabet::A),
978 repo_coordinates
979 .iter()
980 .map(std::string::ToString::to_string)
981 .collect::<Vec<String>>(),
982 ),
983 ]
984 },
985 if proposal_ids.is_empty() {
986 vec![]
987 } else {
988 vec![
989 nostr::Filter::default().events(proposal_ids.clone()).kinds(
990 [
991 vec![Kind::Custom(PATCH_KIND), Kind::EventDeletion],
992 status_kinds(),
993 ]
994 .concat(),
995 ),
996 ]
997 },
998 if required_profiles.is_empty() {
999 vec![]
1000 } else {
1001 vec![
1002 nostr::Filter::default()
1003 .kinds(vec![Kind::Metadata, Kind::RelayList])
1004 .authors(required_profiles.clone()),
1005 ]
1006 },
1007 ]
1008 .concat()
1009}
1010
1011pub fn get_filter_repo_events(repo_coordinates: &HashSet<Coordinate>) -> nostr::Filter {
1012 nostr::Filter::default()
1013 .kind(Kind::Custom(REPO_REF_KIND))
1014 .identifiers(
1015 repo_coordinates
1016 .iter()
1017 .map(|c| c.identifier.clone())
1018 .collect::<Vec<String>>(),
1019 )
1020 .authors(
1021 repo_coordinates
1022 .iter()
1023 .map(|c| c.public_key)
1024 .collect::<Vec<PublicKey>>(),
1025 )
1026}
1027
1028#[derive(Default)]
1029pub struct FetchReport {
1030 relay: Option<Url>,
1031 repo_coordinates: Vec<Coordinate>,
1032 updated_repo_announcements: Vec<(Coordinate, Timestamp)>,
1033 proposals: HashSet<EventId>,
1034 /// commits against existing propoals
1035 commits: HashSet<EventId>,
1036 statuses: HashSet<EventId>,
1037 contributor_profiles: HashSet<PublicKey>,
1038}
1039
1040impl Display for FetchReport {
1041 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1042 // report: "1 new maintainer, 1 announcement, 1 proposal, 3 commits, 2 statuses"
1043 let mut display_items: Vec<String> = vec![];
1044 if !self.repo_coordinates.is_empty() {
1045 display_items.push(format!(
1046 "{} new maintainer{}",
1047 self.repo_coordinates.len(),
1048 if self.repo_coordinates.len() == 1 {
1049 "s"
1050 } else {
1051 ""
1052 },
1053 ));
1054 }
1055 if !self.updated_repo_announcements.is_empty() {
1056 display_items.push(format!(
1057 "{} announcement update{}",
1058 self.updated_repo_announcements.len(),
1059 if self.updated_repo_announcements.len() == 1 {
1060 "s"
1061 } else {
1062 ""
1063 },
1064 ));
1065 }
1066 if !self.proposals.is_empty() {
1067 display_items.push(format!(
1068 "{} proposal{}",
1069 self.proposals.len(),
1070 if self.proposals.len() == 1 { "s" } else { "" },
1071 ));
1072 }
1073 if !self.commits.is_empty() {
1074 display_items.push(format!(
1075 "{} commit{}",
1076 self.commits.len(),
1077 if self.commits.len() == 1 { "s" } else { "" },
1078 ));
1079 }
1080 if !self.statuses.is_empty() {
1081 display_items.push(format!(
1082 "{} status{}",
1083 self.statuses.len(),
1084 if self.statuses.len() == 1 { "es" } else { "" },
1085 ));
1086 }
1087 if !self.contributor_profiles.is_empty() {
1088 display_items.push(format!(
1089 "{} contributor profile{}",
1090 self.contributor_profiles.len(),
1091 if self.contributor_profiles.len() == 1 {
1092 "s"
1093 } else {
1094 ""
1095 },
1096 ));
1097 }
1098 write!(f, "{}", display_items.join(", "))
1099 }
1100}
1101
1102#[derive(Default, Clone)]
1103pub struct FetchRequest {
1104 relay_column_width: usize,
1105 repo_coordinates: Vec<(Coordinate, Option<Timestamp>)>,
1106 proposals: HashSet<EventId>,
1107 contributor_profiles: HashSet<PublicKey>,
1108 existing_events: HashSet<EventId>,
1109}