mod config; mod db; mod discovery; mod git_mirror; mod health; mod http_health; mod nip46; mod nostr_mirror; mod signing; use anyhow::Result; use clap::Parser; use std::path::PathBuf; use std::sync::Arc; #[derive(Parser, Debug)] #[command(name = "grasp-mirror", about = "GRASP mirror daemon")] struct Cli { #[arg(short, long, default_value = "config.toml")] config: PathBuf, #[command(subcommand)] command: Option, } #[derive(clap::Subcommand, Debug)] enum Command { Daemon, Status, Verify, MirrorOnce, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info,nostr_relay_pool=warn")), ) .init(); let cli = Cli::parse(); let config = config::ResolvedConfig::load(&cli.config)?; tracing::info!( npubs = config.npubs.len(), servers = config.servers.known.len(), index_relays = config.discovery.index_relays.len(), "starting grasp-mirror" ); let db = db::MirrorDb::open(&config.storage.database).await?; match cli.command.unwrap_or(Command::Daemon) { Command::Daemon => run_daemon(config, db).await, Command::Status => run_status(db).await, Command::Verify => run_verify(config).await, Command::MirrorOnce => run_mirror_once(config, db).await, } } fn build_nostr_client(relay_urls: &[String]) -> nostr_sdk::Client { let client = nostr_sdk::Client::default(); for url in relay_urls { let _ = client.add_relay(url); } client } async fn run_daemon(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> { let db = Arc::new(db); let config = Arc::new(config); let nip46_client = if let Some(ref nip46_cfg) = config.nip46 { let client = nip46::Nip46Client::new( nip46_cfg.relays.clone(), nip46_cfg.signing_timeout_secs, db.clone(), ) .await?; client.init_sessions(&config.npubs).await?; client.start_listener().await; for status in client.get_status().await { tracing::info!( npub = %status.npub, connected = status.connected, "NIP-46 session status" ); if !status.connected { if let Some(uri) = &status.pairing_uri { tracing::info!( npub = %status.npub, uri = %uri, "Pair this npub by opening the nostrconnect URI in Amber" ); } } } Some(Arc::new(client)) } else { tracing::warn!("no [nip46] config — remote signing disabled"); None }; let (cycle_count_tx, cycle_count_rx) = tokio::sync::watch::channel(0u64); let (last_cycle_ok_tx, last_cycle_ok_rx) = tokio::sync::watch::channel(true); let nip46_status = nip46_client.clone(); let health_state = Arc::new(http_health::HealthState { started_at: std::time::Instant::now(), cycle_count: cycle_count_rx, last_cycle_ok: last_cycle_ok_rx, db_path: config.storage.database.display().to_string(), nip46_client: nip46_status, }); let health_port = config.storage.health_port; let health_state_clone = health_state.clone(); tokio::spawn(async move { if let Err(e) = http_health::start_health_server(health_port, health_state_clone).await { tracing::error!(error = %e, "health server failed"); } }); let servers = health::verify_all_servers(&config.servers.known).await; let healthy: Vec<_> = servers .values() .filter(|s| s.is_grasp_server()) .cloned() .collect(); tracing::info!( total = servers.len(), healthy = healthy.len(), "server verification complete" ); let healthy = Arc::new(healthy); let relay_urls = config.relay_urls(); let nostr_client = build_nostr_client(&relay_urls); nostr_client.connect().await; let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir); let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone()); let nip46_client_ref: Option> = nip46_client.clone(); let mut interval = tokio::time::interval(std::time::Duration::from_secs( config.discovery.poll_interval_secs, )); tracing::info!( "daemon started, polling every {}s", config.discovery.poll_interval_secs ); let mut cycle_count: u64 = 0; loop { tokio::select! { _ = interval.tick() => { let result = mirror_cycle( &config, &db, &nostr_client, &mirror, &nostr_mirror, &healthy, nip46_client_ref.as_ref(), ).await; match &result { Ok(()) => tracing::info!("mirror cycle complete"), Err(e) => tracing::error!(error = %e, "mirror cycle failed"), } cycle_count += 1; let _ = cycle_count_tx.send(cycle_count); let _ = last_cycle_ok_tx.send(result.is_ok()); } _ = tokio::signal::ctrl_c() => { tracing::info!("shutting down"); break; } } } Ok(()) } async fn mirror_cycle( config: &Arc, db: &Arc, nostr_client: &nostr_sdk::Client, mirror: &git_mirror::GitMirror, nostr_mirror: &nostr_mirror::NostrMirror, servers: &Arc>, nip46_client: Option<&Arc>, ) -> Result<()> { tracing::info!("starting mirror cycle"); let relay_urls = config.relay_urls(); let repos = discovery::discover_repos_from_relays(nostr_client, &config.npubs, &relay_urls) .await?; tracing::info!(count = repos.len(), "discovered repos"); discovery::persist_discovered_repos(db, &repos).await?; let server_map: std::collections::HashMap = servers .iter() .map(|s| (s.domain.clone(), s.clone())) .collect(); for repo in &repos { let missing = discovery::identify_missing_servers(repo, &server_map); if missing.is_empty() { tracing::debug!(identifier = %repo.identifier, "repo already on all servers"); continue; } tracing::info!( identifier = %repo.identifier, missing = missing.iter().map(|s| s.domain.as_str()).collect::>().join(", "), "mirroring to missing servers" ); if let Err(e) = mirror.mirror_repo_to_servers(db, repo, &missing, nip46_client, nostr_client).await { tracing::error!(identifier = %repo.identifier, error = %e, "git mirror failed for repo, continuing"); } if let Err(e) = nostr_mirror.forward_repo_events(db, repo, servers).await { tracing::error!(identifier = %repo.identifier, error = %e, "nostr mirror failed for repo, continuing"); } } if let Err(e) = nostr_mirror .sync_all_events(db, &config.npubs, servers) .await { tracing::error!(error = %e, "event sync failed"); } tracing::info!("mirror cycle complete"); Ok(()) } async fn run_status(db: db::MirrorDb) -> Result<()> { let repos = db.get_all_repos().await?; let syncs = db.get_sync_summary().await?; println!("=== GRASP Mirror Status ===\n"); println!("Repos tracked: {}\n", repos.len()); for repo in &repos { println!(" {} ({})", repo.identifier, &repo.pubkey[..12]); } println!("\nSync records: {}\n", syncs.len()); for sync in &syncs { let status = if sync.git_synced && sync.nostr_synced { "OK" } else if sync.error.is_some() { "ERR" } else { "PENDING" }; println!( " repo:{} server:{} git:{} nostr:{} [{}]", sync.repo_id, sync.server_domain, if sync.git_synced { "Y" } else { "N" }, if sync.nostr_synced { "Y" } else { "N" }, status, ); if let Some(err) = &sync.error { println!(" error: {}", err); } } Ok(()) } async fn run_verify(config: config::ResolvedConfig) -> Result<()> { let servers = health::verify_all_servers(&config.servers.known).await; println!("=== GRASP Server Verification ===\n"); for (domain, server) in &servers { let grasp = server.is_grasp_server(); let version = server .nip11 .as_ref() .and_then(|i| i.version.clone()) .unwrap_or_else(|| "unknown".to_string()); let grasps = server .nip11 .as_ref() .and_then(|i| i.supported_grasps.clone()) .map(|g| g.join(", ")) .unwrap_or_else(|| "NONE".to_string()); println!( " {} [{}] v={} GRASP={}", domain, if grasp { "OK" } else { "SKIP" }, version, grasps, ); } let healthy = servers.values().filter(|s| s.is_grasp_server()).count(); println!( "\n{}/{} verified as GRASP servers", healthy, servers.len() ); Ok(()) } async fn run_mirror_once(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> { let db = Arc::new(db); let config = Arc::new(config); let servers = health::verify_all_servers(&config.servers.known).await; let healthy: Vec<_> = servers .values() .filter(|s| s.is_grasp_server()) .cloned() .collect(); let relay_urls = config.relay_urls(); let nostr_client = build_nostr_client(&relay_urls); nostr_client.connect().await; let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir); let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone()); let healthy = Arc::new(healthy); mirror_cycle( &config, &db, &nostr_client, &mirror, &nostr_mirror, &healthy, None, ) .await }