From 8ecf357d9acc9ca2ec79e26a366cd1c07689a0cf Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Fri, 13 Sep 2024 19:43:30 +0100 Subject: fix(remote): add rebustness to fetch reporting by avoiding bugs where lines are removed accidentally by storing report in a mutex and rewriting the entire report at each update --- src/bin/git_remote_nostr/fetch.rs | 182 +++++++++++++++++++++++++++++--------- 1 file changed, 139 insertions(+), 43 deletions(-) diff --git a/src/bin/git_remote_nostr/fetch.rs b/src/bin/git_remote_nostr/fetch.rs index 59067fa..c2c0cb1 100644 --- a/src/bin/git_remote_nostr/fetch.rs +++ b/src/bin/git_remote_nostr/fetch.rs @@ -1,9 +1,13 @@ use core::str; -use std::io::Stdin; +use std::{ + io::Stdin, + sync::{Arc, Mutex}, + time::Instant, +}; use anyhow::{anyhow, bail, Result}; use auth_git2::GitAuthenticator; -use git2::Repository; +use git2::{Progress, Repository}; use ngit::{ git::{ nostr_url::{CloneUrl, NostrUrlDecoded, ServerProtocol}, @@ -18,7 +22,6 @@ use ngit::{ use crate::utils::{ fetch_or_list_error_is_not_authentication_failure, find_proposal_and_patches_by_branch_name, get_oids_from_fetch_batch, get_open_proposals, get_read_protocols_to_try, join_with_and, - report_on_sideband_progress, }; pub async fn run_fetch( @@ -134,7 +137,6 @@ fn fetch_from_git_server( [ServerProtocol::UnauthHttps, ServerProtocol::UnauthHttp].contains(protocol), term, ); - term.clear_last_lines(1)?; if let Err(error) = res { term.write_line( format!("fetch: {formatted_url} failed over {protocol}: {error}").as_str(), @@ -172,55 +174,144 @@ fn fetch_from_git_server( } } -enum ProgressStatus { - InProgress, - Complete, -} - #[allow(clippy::cast_precision_loss)] #[allow(clippy::float_cmp)] #[allow(clippy::needless_pass_by_value)] fn report_on_transfer_progress( - progress_stats: &git2::Progress<'_>, - term: &console::Term, - status: ProgressStatus, -) { + progress_stats: &Progress<'_>, + start_time: &Instant, + end_time: &Option, +) -> Vec { + let mut report = vec![]; let total = progress_stats.total_objects() as f64; if total == 0.0 { - return; + return report; } let received = progress_stats.received_objects() as f64; - let percentage = (received / total) * 100.0; + let percentage = ((received / total) * 100.0) + // always round down because 100% complete is misleading when its not complete + .floor(); - // Get the total received bytes let received_bytes = progress_stats.received_bytes() as f64; - // Determine whether to use KiB or MiB let (size, unit) = if received_bytes >= (1024.0 * 1024.0) { - // Convert to MiB (received_bytes / (1024.0 * 1024.0), "MiB") } else { - // Convert to KiB (received_bytes / 1024.0, "KiB") }; + let speed = { + let duration = if let Some(end_time) = end_time { + (*end_time - *start_time).as_millis() as f64 + } else { + start_time.elapsed().as_millis() as f64 + }; + + if duration > 0.0 { + (received_bytes / (1024.0 * 1024.0)) / (duration / 1000.0) // Convert bytes to MiB and milliseconds to seconds + } else { + 0.0 + } + }; + // Format the output for receiving objects - if received < total || matches!(status, ProgressStatus::Complete) { - let _ = term.write_line( - format!( - "Receiving objects: {percentage:.0}% ({received}/{total}) {size:.2} {unit}, done.", - ) - .as_str(), - ); - } - if received == total || matches!(status, ProgressStatus::Complete) { + report.push(format!( + "Receiving objects: {percentage}% ({received}/{total}) {size:.2} {unit} | {speed:.2} MiB/s{}", + if received == total { + ", done." + } else { ""}, + )); + if received == total { let indexed_deltas = progress_stats.indexed_deltas() as f64; let total_deltas = progress_stats.total_deltas() as f64; - let percentage = (indexed_deltas / total_deltas) * 100.0; - let _ = term.write_line( - format!("Resolving deltas: {percentage:.0}% ({indexed_deltas}/{total_deltas}) done.") - .as_str(), - ); + let percentage = ((indexed_deltas / total_deltas) * 100.0) + // always round down because 100% complete is misleading when its not complete + .floor(); + report.push(format!( + "Resolving deltas: {percentage}% ({indexed_deltas}/{total_deltas}){}", + if indexed_deltas == total_deltas { + ", done." + } else { + "" + }, + )); + } + report +} + +struct FetchReporter<'a> { + remote_msgs: Vec, + transfer_progress_msgs: Vec, + term: &'a console::Term, + start_time: Option, + end_time: Option, +} +impl<'a> FetchReporter<'a> { + fn new(term: &'a console::Term) -> Self { + Self { + remote_msgs: vec![], + transfer_progress_msgs: vec![], + term, + start_time: None, + end_time: None, + } + } + fn write_all(&self, lines_to_clear: usize) { + let _ = self.term.clear_last_lines(lines_to_clear); + for msg in &self.remote_msgs { + let _ = self.term.write_line(msg); + } + for msg in &self.transfer_progress_msgs { + let _ = self.term.write_line(msg); + } + } + fn write_transfer_progress(&self, lines_to_clear: usize) { + let _ = self.term.clear_last_lines(lines_to_clear); + for msg in &self.transfer_progress_msgs { + let _ = self.term.write_line(msg); + } + } + fn count_all_existing_lines(&self) -> usize { + self.remote_msgs.len() + self.transfer_progress_msgs.len() + } + fn process_remote_msg(&mut self, data: &[u8]) { + let existing_lines = self.count_all_existing_lines(); + if let Ok(data) = str::from_utf8(data) { + let data = data + .split(['\n', '\r']) + .find(|line| !line.is_empty()) + .unwrap_or("") + .trim(); + if !data.is_empty() { + let msg = format!("remote: {data}"); + if let Some(last) = self.remote_msgs.last() { + if (last.contains('%') && !last.contains("100%")) + || last == &msg.replace(", done.", "") + { + self.remote_msgs.pop(); + } + } + self.remote_msgs.push(msg); + self.write_all(existing_lines); + } + } + } + fn process_transfer_progress_update(&mut self, progress_stats: &git2::Progress<'_>) { + if self.start_time.is_none() { + self.start_time = Some(Instant::now()); + } + let existing_lines = self.count_all_existing_lines(); + let updated = + report_on_transfer_progress(progress_stats, &self.start_time.unwrap(), &self.end_time); + if self.transfer_progress_msgs.len() <= updated.len() { + if self.end_time.is_none() && updated.first().is_some_and(|f| f.contains("100%")) { + self.end_time = Some(Instant::now()); + } + // once "Resolving Deltas" is complete, deltas get reset to 0 and it stops + // reporting on it so we want to keep the old report + self.transfer_progress_msgs = updated; + } + self.write_all(existing_lines); } } @@ -239,25 +330,30 @@ fn fetch_from_git_server_url( let auth = GitAuthenticator::default(); let mut fetch_options = git2::FetchOptions::new(); let mut remote_callbacks = git2::RemoteCallbacks::new(); - remote_callbacks.sideband_progress(|data| { - report_on_sideband_progress(data, term); - true + let fetch_reporter = Arc::new(Mutex::new(FetchReporter::new(term))); + remote_callbacks.sideband_progress({ + let fetch_reporter = Arc::clone(&fetch_reporter); + move |data| { + let mut reporter = fetch_reporter.lock().unwrap(); + reporter.process_remote_msg(data); + true + } }); - remote_callbacks.transfer_progress(|stats| { - let _ = term.clear_last_lines(1); - report_on_transfer_progress(&stats, term, ProgressStatus::InProgress); - true + remote_callbacks.transfer_progress({ + let fetch_reporter = Arc::clone(&fetch_reporter); + move |stats| { + let mut reporter = fetch_reporter.lock().unwrap(); + reporter.process_transfer_progress_update(&stats); + true + } }); if !dont_authenticate { remote_callbacks.credentials(auth.credentials(&git_config)); } fetch_options.remote_callbacks(remote_callbacks); - term.write_line("")?; git_server_remote.download(oids, Some(&mut fetch_options))?; - report_on_transfer_progress(&git_server_remote.stats(), term, ProgressStatus::Complete); - git_server_remote.disconnect()?; Ok(()) } -- cgit v1.2.3