From 82c7a95f6e9aa266d2f0d2035a0ce4f1715b62ad Mon Sep 17 00:00:00 2001 From: DanConwayDev Date: Thu, 13 Nov 2025 17:18:35 +0000 Subject: feat(list): make list async and include sync report inline copy relay fetching approach to async and reporting --- src/bin/git_remote_nostr/list.rs | 16 +- src/bin/git_remote_nostr/push.rs | 8 +- src/bin/ngit/sub_commands/sync.rs | 10 +- src/lib/list.rs | 378 +++++++++++++++++++++++++++++++++----- 4 files changed, 347 insertions(+), 65 deletions(-) (limited to 'src') diff --git a/src/bin/git_remote_nostr/list.rs b/src/bin/git_remote_nostr/list.rs index d06d202..7753ba1 100644 --- a/src/bin/git_remote_nostr/list.rs +++ b/src/bin/git_remote_nostr/list.rs @@ -8,7 +8,7 @@ use ngit::{ fetch::fetch_from_git_server, git::{self}, git_events::{KIND_PULL_REQUEST, KIND_PULL_REQUEST_UPDATE, event_to_cover_letter, tag_value}, - list::{generate_remote_sync_warnings, identify_remote_sync_issues, list_from_remotes}, + list::list_from_remotes, login::get_curent_user, repo_ref::{self}, utils::{get_all_proposals, get_open_or_draft_proposals}, @@ -27,23 +27,17 @@ pub async fn run_list( let term = console::Term::stderr(); + term.write_line("git servers: listing refs...")?; let remote_states = list_from_remotes( &term, git_repo, &repo_ref.git_server, &repo_ref.to_nostr_git_url(&None), - ); + nostr_state.as_ref(), + ) + .await; let mut state = if let Some(nostr_state) = nostr_state { - // Identify sync issues using shared abstraction - let remote_issues = identify_remote_sync_issues(git_repo, &nostr_state, &remote_states); - - // Generate and print warnings - let warnings = generate_remote_sync_warnings(&remote_issues, &remote_states); - for warning in warnings { - term.write_line(&warning)?; - } - nostr_state.state } else { let (state, _is_grasp_server) = repo_ref diff --git a/src/bin/git_remote_nostr/push.rs b/src/bin/git_remote_nostr/push.rs index 12350e7..9cf2c52 100644 --- a/src/bin/git_remote_nostr/push.rs +++ b/src/bin/git_remote_nostr/push.rs @@ -69,14 +69,18 @@ pub async fn run_push( let term = console::Term::stderr(); - let list_outputs = list_outputs.unwrap_or_else(|| { + let list_outputs = if let Some(outputs) = list_outputs { + outputs + } else { list_from_remotes( &term, git_repo, &repo_ref.git_server, &repo_ref.to_nostr_git_url(&None), + None, ) - }); + .await + }; let existing_state = { // if no state events - create from first git server listed diff --git a/src/bin/ngit/sub_commands/sync.rs b/src/bin/ngit/sub_commands/sync.rs index d84feb6..daebb1b 100644 --- a/src/bin/ngit/sub_commands/sync.rs +++ b/src/bin/ngit/sub_commands/sync.rs @@ -77,8 +77,14 @@ pub async fn launch(args: &SubCommandArgs) -> Result<()> { let term = console::Term::stderr(); - let remote_states = - list_from_remotes(&term, &git_repo, &repo_ref.git_server, &decoded_nostr_url); + let remote_states = list_from_remotes( + &term, + &git_repo, + &repo_ref.git_server, + &decoded_nostr_url, + Some(&nostr_state), + ) + .await; let missing_refs = fetch_missing_refs(&git_repo, &nostr_state, &remote_states, &decoded_nostr_url); diff --git a/src/lib/list.rs b/src/lib/list.rs index 08b197c..69da792 100644 --- a/src/lib/list.rs +++ b/src/lib/list.rs @@ -1,7 +1,19 @@ -use std::{collections::HashMap, path::PathBuf, str::FromStr}; +use std::{ + collections::HashMap, + path::PathBuf, + str::FromStr, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; use anyhow::{Result, anyhow}; use auth_git2::GitAuthenticator; +use console::Style; +use futures::stream::{self, StreamExt}; +use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use nostr::hashes::sha1::Hash as Sha1Hash; use crate::{ @@ -44,34 +56,321 @@ impl RemoteIssues { } } -pub fn list_from_remotes( +static GIT_SERVER_SUCCESS_THRESHOLD: f64 = 0.5; // 50% of servers must succeed to switch to short timeout + +fn git_server_long_timeout() -> u64 { + if std::env::var("NGITTEST").is_ok() { + 1 + } else { + 60 + } +} + +fn git_server_short_timeout() -> u64 { + if std::env::var("NGITTEST").is_ok() { + 1 + } else { + 5 + } +} + +fn git_server_pb_style(current_timeout: Arc) -> Result { + Ok(ProgressStyle::with_template(" {spinner} {prefix} {msg}")? + .with_key( + "timeout_display", + move |_state: &ProgressState, w: &mut dyn std::fmt::Write| { + write!(w, "{}s", current_timeout.load(Ordering::Relaxed)).unwrap(); + }, + ) + .tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ ")) +} + +fn git_server_pb_after_style(succeed: bool) -> ProgressStyle { + let symbol = if succeed { + console::style("✔".to_string()) + .for_stderr() + .green() + .to_string() + } else { + console::style("✘".to_string()) + .for_stderr() + .red() + .to_string() + }; + ProgressStyle::with_template(&format!(" {symbol} {{prefix}} {{msg}}")) + .unwrap_or_else(|_| ProgressStyle::default_bar()) +} + +pub async fn list_from_remotes( term: &console::Term, git_repo: &Repo, - git_servers: &Vec, + git_servers: &[String], decoded_nostr_url: &NostrUrlDecoded, + nostr_state: Option<&RepoState>, ) -> HashMap, bool)> { + if git_servers.is_empty() { + return HashMap::new(); + } + + let progress_reporter = if std::env::var("NGITTEST").is_err() { + MultiProgress::new() + } else { + MultiProgress::with_draw_target(indicatif::ProgressDrawTarget::hidden()) + }; + + // Track successful servers for adaptive timeout + let success_count = Arc::new(AtomicU64::new(0)); + let current_timeout = Arc::new(AtomicU64::new(git_server_long_timeout())); + let total_servers = git_servers.len() as u64; + + // Calculate column width for alignment + let server_column_width = git_servers + .iter() + .map(|s| get_short_git_server_name(s).chars().count()) + .max() + .unwrap_or(20) + + 2; + + let futures: Vec<_> = git_servers + .iter() + .map(|url| { + let url = url.clone(); + let is_grasp_server = is_grasp_server_clone_url(&url); + let success_count_clone = success_count.clone(); + let current_timeout_clone = current_timeout.clone(); + let progress_reporter_clone = progress_reporter.clone(); + let decoded_nostr_url = decoded_nostr_url.clone(); + + async move { + let dim = Style::new().color256(247); + let server_name = get_short_git_server_name(&url); + + let pb = if std::env::var("NGITTEST").is_err() { + match git_server_pb_style(current_timeout_clone.clone()) { + Ok(style) => { + let pb = progress_reporter_clone.add( + ProgressBar::new(1) + .with_prefix( + dim.apply_to(format!( + "{: None, + } + } else { + None + }; + + fn update_progress_bar_with_error( + server_column_width: usize, + server_name: &str, + pb: Option, + error: &anyhow::Error, + ) { + if let Some(pb) = pb { + pb.set_style(git_server_pb_after_style(false)); + pb.set_prefix( + Style::new() + .color256(247) + .apply_to(format!("{: Repo::from_path(&path).ok(), + None => None, + }; + + match git_repo { + Some(ref repo) => list_from_remote_sync( + repo, + &url_clone, + &decoded_nostr_url_clone, + is_grasp_server, + pb_clone.as_ref(), + ), + None => Err(anyhow!("failed to open git repository")), + } + }) + .await + { + Ok(result) => result, + Err(e) => Err(anyhow!("task join error: {}", e)), + } + }; + + let timeout_future = async { + let check_interval = Duration::from_millis(100); + let long_timeout_end = tokio::time::Instant::now() + + Duration::from_secs(git_server_long_timeout()); + + loop { + let current_success_count = success_count_clone.load(Ordering::Relaxed); + let threshold = (total_servers as f64 * GIT_SERVER_SUCCESS_THRESHOLD).ceil() as u64; + + if current_success_count >= threshold { + tokio::time::sleep(Duration::from_secs(git_server_short_timeout())).await; + return "short"; + } + + if tokio::time::Instant::now() >= long_timeout_end { + return "long"; + } + + tokio::time::sleep(check_interval).await; + } + }; + + let result = tokio::select! { + result = list_future => { + if result.is_ok() { + let new_count = success_count_clone.fetch_add(1, Ordering::Relaxed) + 1; + let threshold = (total_servers as f64 * GIT_SERVER_SUCCESS_THRESHOLD).ceil() as u64; + + if new_count >= threshold { + current_timeout_clone.store(git_server_short_timeout(), Ordering::Relaxed); + } + } + result + } + timeout_type = timeout_future => { + Err(anyhow!("timeout after {}s", + if timeout_type == "long" { git_server_long_timeout() } else { git_server_short_timeout() })) + } + }; + + match result { + Err(error) => { + update_progress_bar_with_error( + server_column_width, + &server_name, + pb, + &error, + ); + Err((url, error)) + } + Ok(state) => { + // Determine sync status message and styling using existing functions + let status_msg = if state.is_empty() { + "empty repository".to_string() + } else if let Some(nostr_state) = nostr_state { + // Use existing generate_remote_sync_warnings to get detailed status + let mut temp_states = HashMap::new(); + temp_states.insert(url.clone(), (state.clone(), is_grasp_server)); + let remote_issues = identify_remote_sync_issues(git_repo, nostr_state, &temp_states); + let warnings = generate_remote_sync_warnings(&remote_issues, &temp_states); + + if warnings.is_empty() { + "in sync".to_string() + } else { + // Extract the message after "WARNING: " + let warning = &warnings[0]; + let server_name = get_short_git_server_name(&url); + let prefix = format!("WARNING: {} ", server_name); + warning.strip_prefix(&prefix) + .unwrap_or(warning) + .to_string() + } + } else { + // No nostr state to compare against + "success".to_string() + }; + + let message_style = if status_msg == "empty repository" { + console::style(&status_msg).for_stderr().red() + } else if status_msg == "in sync" || status_msg == "success" { + console::style(&status_msg).for_stderr().green() + } else { + console::style(&status_msg).for_stderr().yellow() + }; + + let is_success = status_msg != "empty repository"; + + if let Some(pb) = pb { + pb.set_style(git_server_pb_after_style(is_success)); + pb.set_prefix( + Style::new() + .color256(247) + .apply_to(format!("{: , bool), (String, anyhow::Error)>>>() + .await; + let mut remote_states = HashMap::new(); - let mut errors = HashMap::new(); - for url in git_servers { - let is_grasp_server = is_grasp_server_clone_url(url); - match list_from_remote(term, git_repo, url, decoded_nostr_url, is_grasp_server) { - Err(error) => { - errors.insert(url, error); + for result in results { + match result { + Ok((url, state, is_grasp_server)) => { + remote_states.insert(url, (state, is_grasp_server)); } - Ok(state) => { - remote_states.insert(url.to_string(), (state, is_grasp_server)); + Err((url, error)) => { + // Errors are already displayed in progress bars + let _ = term.write_line(&format!("failed to list from {}: {}", url, error)); } } } + remote_states } +// Backward-compatible synchronous wrapper for use in non-async contexts pub fn list_from_remote( - term: &console::Term, + _term: &console::Term, + git_repo: &Repo, + git_server_url: &str, + decoded_nostr_url: &NostrUrlDecoded, + is_grasp_server: bool, +) -> Result> { + list_from_remote_sync( + git_repo, + git_server_url, + decoded_nostr_url, + is_grasp_server, + None, + ) +} + +fn list_from_remote_sync( git_repo: &Repo, git_server_url: &str, decoded_nostr_url: &NostrUrlDecoded, is_grasp_server: bool, + pb: Option<&ProgressBar>, ) -> Result> { let server_url = git_server_url.parse::()?; let protocols_to_attempt = @@ -81,13 +380,15 @@ pub fn list_from_remote( let mut remote_state: Option> = None; for protocol in &protocols_to_attempt { - term.write_line( - format!( - "fetching {} ref list over {protocol}...", - server_url.short_name(), - ) - .as_str(), - )?; + if let Some(pb) = pb { + // Only show protocol for non-grasp servers as they can failover to other + // protocols + if is_grasp_server { + pb.set_message("".to_string()); + } else { + pb.set_message(format!("via {protocol}")); + } + } let formatted_url = server_url.format_as(protocol)?; @@ -96,46 +397,29 @@ pub fn list_from_remote( &formatted_url, decoded_nostr_url.ssh_key_file_path().as_ref(), [ServerProtocol::UnauthHttps, ServerProtocol::UnauthHttp].contains(protocol), - term, ); match res { Ok(state) => { remote_state = Some(state); if !is_grasp_server && !failed_protocols.is_empty() { - term.write_line( - format!( - "list: succeeded over {protocol} from {}", - server_url.short_name(), - ) - .as_str(), - )?; let _ = set_protocol_preference(git_repo, protocol, &server_url, &Direction::Fetch); } break; } Err(error) => { - if is_grasp_server { - term.write_line(&format!("list: failed: {error}"))?; - } else { - term.write_line(&format!( - "list: {formatted_url} failed over {protocol}{}: {error}", - if protocol == &ServerProtocol::Ssh { - if let Some(ssh_key_file) = &decoded_nostr_url.ssh_key_file_path() { - format!(" with ssh key from {ssh_key_file}") - } else { - String::new() - } - } else { - String::new() - } - ))?; - } failed_protocols.push(protocol); + if failed_protocols.len() == protocols_to_attempt.len() { + // All protocols failed + if let Some(pb) = pb { + pb.set_message(format!("all protocols failed: {}", error)); + } + } } } } + if let Some(remote_state) = remote_state { Ok(remote_state) } else { @@ -149,9 +433,6 @@ pub fn list_from_remote( "" }, ); - if !is_grasp_server { - term.write_line(format!("list: {error}").as_str())?; - } Err(error) } } @@ -161,7 +442,6 @@ fn list_from_remote_url( git_server_remote_url: &str, ssh_key_file: Option<&String>, dont_authenticate: bool, - term: &console::Term, ) -> Result> { let git_config = git_repo.git_repo.config()?; @@ -185,9 +465,7 @@ fn list_from_remote_url( if !dont_authenticate { remote_callbacks.credentials(auth.credentials(&git_config)); } - term.write_line("list: connecting...")?; git_server_remote.connect_auth(git2::Direction::Fetch, Some(remote_callbacks), None)?; - term.clear_last_lines(1)?; let mut state = HashMap::new(); for head in git_server_remote.list()? { if let Some(symbolic_reference) = head.symref_target() { @@ -435,7 +713,7 @@ pub fn generate_remote_sync_warnings( if let Some(state) = remote_state { // Check if remote is completely empty if state.is_empty() { - warnings.push(format!("WARNING: {remote_name} has no data.")); + warnings.push(format!("WARNING: {remote_name} has empty repository.")); continue; } -- cgit v1.2.3