use std::{ sync::{Arc, Mutex}, time::Instant, }; use anyhow::{Result, anyhow, bail}; use auth_git2::GitAuthenticator; use git2::{Progress, Repository}; use crate::{ cli_interactor::count_lines_per_msg_vec, git::{ Repo, RepoActions, nostr_url::{CloneUrl, NostrUrlDecoded, ServerProtocol}, utils::check_ssh_keys, }, utils::{Direction, get_read_protocols_to_try, join_with_and, set_protocol_preference}, }; pub fn fetch_from_git_server( git_repo: &Repo, oids: &[String], git_server_url: &str, decoded_nostr_url: &NostrUrlDecoded, term: &console::Term, is_grasp_server: bool, ) -> Result<()> { let already_have_oids = oids .iter() .all(|oid| git_repo.does_commit_exist(oid).is_ok_and(|outcome| outcome)); if already_have_oids { return Ok(()); } let server_url = git_server_url.parse::()?; let protocols_to_attempt = get_read_protocols_to_try(git_repo, &server_url, decoded_nostr_url, is_grasp_server); let mut failed_protocols = vec![]; let mut success = false; for protocol in &protocols_to_attempt { term.write_line( format!("fetching {} over {protocol}...", server_url.short_name(),).as_str(), )?; let formatted_url = server_url.format_as(protocol, &decoded_nostr_url.user)?; let res = fetch_from_git_server_url( &git_repo.git_repo, oids, &formatted_url, [ServerProtocol::UnauthHttps, ServerProtocol::UnauthHttp].contains(protocol), term, ); if let Err(error) = res { term.write_line( format!("fetch: {formatted_url} failed over {protocol}: {error}").as_str(), )?; failed_protocols.push(protocol); } else { success = true; if !failed_protocols.is_empty() { term.write_line(format!("fetch: succeeded over {protocol}").as_str())?; let _ = set_protocol_preference(git_repo, protocol, &server_url, &Direction::Push); } break; } } if success { Ok(()) } else { let error = anyhow!( "{} failed over {}{}", server_url.short_name(), join_with_and(&failed_protocols), if decoded_nostr_url.protocol.is_some() { " and nostr url contains protocol override so no other protocols were attempted" } else { "" }, ); term.write_line(format!("fetch: {error}").as_str())?; Err(error) } } fn fetch_from_git_server_url( git_repo: &Repository, oids: &[String], git_server_url: &str, dont_authenticate: bool, term: &console::Term, ) -> Result<()> { if git_server_url.parse::()?.protocol() == ServerProtocol::Ssh && !check_ssh_keys() { bail!("no ssh keys found"); } let git_config = git_repo.config()?; let mut git_server_remote = git_repo.remote_anonymous(git_server_url)?; let auth = GitAuthenticator::default(); let mut fetch_options = git2::FetchOptions::new(); let mut remote_callbacks = git2::RemoteCallbacks::new(); let fetch_reporter = Arc::new(Mutex::new(FetchReporter::new(term))); remote_callbacks.sideband_progress({ let fetch_reporter = Arc::clone(&fetch_reporter); move |data| { let mut reporter = fetch_reporter.lock().unwrap(); reporter.process_remote_msg(data); true } }); remote_callbacks.transfer_progress({ let fetch_reporter = Arc::clone(&fetch_reporter); move |stats| { let mut reporter = fetch_reporter.lock().unwrap(); reporter.process_transfer_progress_update(&stats); true } }); if !dont_authenticate { remote_callbacks.credentials(auth.credentials(&git_config)); } fetch_options.remote_callbacks(remote_callbacks); git_server_remote.download(oids, Some(&mut fetch_options))?; git_server_remote.disconnect()?; Ok(()) } struct FetchReporter<'a> { remote_msgs: Vec, transfer_progress_msgs: Vec, term: &'a console::Term, start_time: Option, end_time: Option, } impl<'a> FetchReporter<'a> { fn new(term: &'a console::Term) -> Self { Self { remote_msgs: vec![], transfer_progress_msgs: vec![], term, start_time: None, end_time: None, } } fn write_all(&self, lines_to_clear: usize) { let _ = self.term.clear_last_lines(lines_to_clear); for msg in &self.remote_msgs { let _ = self.term.write_line(format!("remote: {msg}").as_str()); } for msg in &self.transfer_progress_msgs { let _ = self.term.write_line(msg); } } fn count_all_existing_lines(&self) -> usize { let width = self.term.size().1; count_lines_per_msg_vec(width, &self.remote_msgs, "remote: ".len()) + count_lines_per_msg_vec(width, &self.transfer_progress_msgs, 0) } fn just_write_transfer_progress(&self, lines_to_clear: usize) { let _ = self.term.clear_last_lines(lines_to_clear); for msg in &self.transfer_progress_msgs { let _ = self.term.write_line(msg); } } fn just_count_transfer_progress(&self) -> usize { let width = self.term.size().1; count_lines_per_msg_vec(width, &self.transfer_progress_msgs, 0) } fn process_remote_msg(&mut self, data: &[u8]) { if let Ok(data) = str::from_utf8(data) { let data = data .split(['\n', '\r']) .map(str::trim) .filter(|line| !line.trim().is_empty()) .collect::>(); for data in data { let existing_lines = self.count_all_existing_lines(); let msg = data.to_string(); if let Some(last) = self.remote_msgs.last() { // if previous line begins with x but doesnt finish with y then its part of the // same msg if (last.starts_with("Enume") && !last.ends_with(", done.")) || ((last.starts_with("Compre") || last.starts_with("Count")) && !last.contains(')')) { let last = self.remote_msgs.pop().unwrap(); self.remote_msgs.push(format!("{last}{msg}")); // if previous msg contains % and its not 100% then it // should be overwritten } else if (last.contains('%') && !last.contains("100%")) // but also if the next message is identical with "", done." appended || last == &msg.replace(", done.", "") { self.remote_msgs.pop(); self.remote_msgs.push(msg); } else { self.remote_msgs.push(msg); } } else { self.remote_msgs.push(msg); } self.write_all(existing_lines); } } } fn process_transfer_progress_update(&mut self, progress_stats: &git2::Progress<'_>) { if self.start_time.is_none() { self.start_time = Some(Instant::now()); } let existing_lines = self.just_count_transfer_progress(); let updated = report_on_transfer_progress( progress_stats, &self.start_time.unwrap(), self.end_time.as_ref(), ); if self.transfer_progress_msgs.len() <= updated.len() { if self.end_time.is_none() && updated.first().is_some_and(|f| f.contains("100%")) { self.end_time = Some(Instant::now()); } // once "Resolving Deltas" is complete, deltas get reset to 0 and it stops // reporting on it so we want to keep the old report self.transfer_progress_msgs = updated; } self.just_write_transfer_progress(existing_lines); } } #[allow(clippy::cast_precision_loss)] #[allow(clippy::float_cmp)] #[allow(clippy::needless_pass_by_value)] fn report_on_transfer_progress( progress_stats: &Progress<'_>, start_time: &Instant, end_time: Option<&Instant>, ) -> Vec { let mut report = vec![]; let total = progress_stats.total_objects() as f64; if total == 0.0 { return report; } let received = progress_stats.received_objects() as f64; let percentage = ((received / total) * 100.0) // always round down because 100% complete is misleading when its not complete .floor(); let received_bytes = progress_stats.received_bytes() as f64; let (size, unit) = if received_bytes >= (1024.0 * 1024.0) { (received_bytes / (1024.0 * 1024.0), "MiB") } else { (received_bytes / 1024.0, "KiB") }; let speed = { let duration = if let Some(end_time) = end_time { (*end_time - *start_time).as_millis() as f64 } else { start_time.elapsed().as_millis() as f64 }; if duration > 0.0 { (received_bytes / (1024.0 * 1024.0)) / (duration / 1000.0) // Convert bytes to MiB and milliseconds to seconds } else { 0.0 } }; // Format the output for receiving objects report.push(format!( "Receiving objects: {percentage}% ({received}/{total}) {size:.2} {unit} | {speed:.2} MiB/s{}", if received == total { ", done." } else { ""}, )); if received == total { let indexed_deltas = progress_stats.indexed_deltas() as f64; let total_deltas = progress_stats.total_deltas() as f64; let percentage = ((indexed_deltas / total_deltas) * 100.0) // always round down because 100% complete is misleading when its not complete .floor(); if total_deltas > 0.0 { report.push(format!( "Resolving deltas: {percentage}% ({indexed_deltas}/{total_deltas}){}", if indexed_deltas == total_deltas { ", done." } else { "" }, )); } } report } #[cfg(test)] mod tests { use super::*; fn pass_through_fetch_reporter_proces_remote_msg(msgs: Vec<&str>) -> Vec { let term = console::Term::stdout(); let mut reporter = FetchReporter::new(&term); for msg in msgs { reporter.process_remote_msg(msg.as_bytes()); } reporter.remote_msgs } #[test] fn logs_single_msg() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", ]), vec!["Enumerating objects: 23716, done."] ); } #[test] fn logs_multiple_msgs() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ] ); } mod ignores { use super::*; #[test] fn empty_msgs() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "", "Counting objects: 0% (1/2195)", "", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ] ); } #[test] fn whitespace_msgs() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", " ", "Counting objects: 0% (1/2195)", " \r\n \r", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ] ); } } mod splits { use super::*; #[test] fn multiple_lines_in_single_msg() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.\r\nCounting objects: 0% (1/2195)", "", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ] ); } } mod joins_lines_sent_over_multiple_msgs { use super::*; #[test] fn enumerating() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerat", "ing objec", "ts: 23716, done.", "Counting objects: 0% (1/2195)", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", ] ); } #[test] fn counting() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "Counting obj", "ects: 0% (1/2195)", "Count", "ing objects: 1% (22/", "2195)", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 1% (22/2195)", ] ); } #[test] fn compressing() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Compress", "ing obj", "ect", "s: 0% (1/56", "0)" ]), vec!["Compressing objects: 0% (1/560)"] ); } } #[test] fn msgs_with_pc_and_not_100pc_are_replaced() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", "Counting objects: 1% (22/2195)", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 1% (22/2195)", ] ); } mod msgs_with_pc_100pc_are_not_replaced { use super::*; #[test] fn when_next_msg_is_not_identical_but_with_done() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", "Counting objects: 1% (22/2195)", "Counting objects: 100% (2195/2195)", "Compressing objects: 0% (1/560)" ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 100% (2195/2195)", "Compressing objects: 0% (1/560)" ] ); } #[test] fn but_is_when_next_msg_is_identical_but_with_done_appended() { assert_eq!( pass_through_fetch_reporter_proces_remote_msg(vec![ "Enumerating objects: 23716, done.", "Counting objects: 0% (1/2195)", "Counting objects: 1% (22/2195)", "Counting objects: 100% (2195/2195)", "Counting objects: 100% (2195/2195), done.", ]), vec![ "Enumerating objects: 23716, done.", "Counting objects: 100% (2195/2195), done.", ] ); } } }