use std::time::Duration; use std::{path::PathBuf, sync::Arc}; use anyhow::Result; use clap::Parser; use tokio::signal; use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; use ngit_grasp::{ audit_cleanup, cleanup_empty_repos, config::{Config, DatabaseBackend}, git, http, metrics::Metrics, nostr, purgatory::{sync::RealSyncContext, sync::ThrottleManager, Purgatory}, sync::{naughty_list::NaughtyListTracker, SyncManager}, }; /// Top-level CLI dispatcher. /// /// With no subcommand the binary runs the relay (all relay flags apply). /// With a subcommand it runs the requested maintenance tool instead. #[derive(Debug, Parser)] #[command(author, version, about = "ngit-grasp GRASP relay", long_about = None)] #[command(propagate_version = true)] enum Cli { /// Run the GRASP relay server (default when no subcommand is given). #[command(name = "serve")] Serve(Config), /// Remove kind 30617/30618 events whose bare git repository is empty or missing. /// /// Runs in dry-run mode by default. Pass --execute to make changes. /// Stop the relay service before running with --execute. CleanupEmptyRepos(cleanup_empty_repos::CleanupArgs), } #[tokio::main] async fn main() -> Result<()> { // Load .env file before clap parses, so env vars are available. dotenvy::dotenv().ok(); // Peek at argv[1] to decide whether a subcommand was explicitly provided. // If not, prepend the implicit "serve" subcommand so that clap routes to Cli::Serve // and all relay flags are parsed normally (preserving backward compatibility). let mut args: Vec = std::env::args().collect(); let known_subcommands = ["serve", "cleanup-empty-repos", "help"]; let has_subcommand = args.get(1).map_or(false, |a| { known_subcommands.contains(&a.as_str()) || matches!(a.as_str(), "-h" | "--help" | "-V" | "--version") }); if !has_subcommand { args.insert(1, "serve".to_string()); } match Cli::parse_from(args) { Cli::CleanupEmptyRepos(cleanup_args) => { cleanup_empty_repos::run(&cleanup_args).await } Cli::Serve(mut config) => { // Finish initialising the Config (load relay owner key if not provided). if config.relay_owner_nsec.is_none() { config.relay_owner_nsec = Some(Config::load_or_generate_relay_owner_key()?); } else { config.relay_owner_nsec = config.relay_owner_nsec.take().map(|s| s.trim().to_string()); } run_relay(config).await } } } async fn run_relay(config: Config) -> Result<()> { // Initialize tracing with configured log level let subscriber = FmtSubscriber::builder() .with_env_filter(EnvFilter::new(&config.log_level)) .finish(); tracing::subscriber::set_global_default(subscriber)?; info!("Starting ngit-grasp with log level: {}", config.log_level); // Validate configuration and fail fast on fatal errors // Recoverable issues (e.g., malformed whitelist entries) are logged as warnings config.validate()?; info!( "Configuration loaded and validated: {}", config.bind_address ); info!("Domain: {}", config.domain); info!("Relay name: {}", config.relay_name()); info!("Git data directory: {}", config.effective_git_data_path()); if config.database_backend != DatabaseBackend::Memory { info!("Relay data directory: {}", config.relay_data_path); } info!("Database backend: {}", config.database_backend); // Initialize metrics if enabled let metrics = if config.metrics_enabled { info!("Metrics enabled on /metrics endpoint"); let m = Arc::new(Metrics::new( config.metrics_connection_per_ip_abuse_threshold, Some(config.effective_git_data_path()), )); info!("Repository count will be updated on each metrics request"); Some(m) } else { info!("Metrics disabled"); None }; // Create purgatory for event/git coordination let purgatory = Arc::new(Purgatory::new(PathBuf::from( config.effective_git_data_path(), ))); info!("Purgatory initialized for event coordination"); // Restore purgatory state from disk if available let purgatory_path = PathBuf::from(config.effective_git_data_path()).join("purgatory-state.json"); if purgatory_path.exists() { match purgatory.restore_from_disk(&purgatory_path) { Ok(()) => { info!("Restored purgatory state from disk"); // Re-queueing will happen later after sync system is created } Err(e) => { warn!("Failed to restore purgatory state: {}, starting empty", e); } } } // Create Nostr relay with NIP-34 validation // Returns both the relay and database for direct queries in handlers if let Ok(relay_with_db) = nostr::builder::create_relay(&config, purgatory.clone()).await { info!( "Relay created with NIP-34 validation for domain: {}", config.domain ); // Set the local relay on the write policy for purgatory notifications // This must be done after relay creation since the relay depends on the policy relay_with_db .write_policy .set_local_relay(relay_with_db.relay.clone()); // Start SyncManager for proactive sync (Phase 2: multi-relay support, Phase 3: health tracking) // Even without bootstrap relay, SyncManager discovers relays from stored announcements // Pass the already-registered sync metrics from Metrics to avoid duplicate registration let sync_manager = SyncManager::new( config.sync_bootstrap_relay_url.clone(), config.domain.clone(), relay_with_db.database.clone(), relay_with_db.write_policy.clone(), relay_with_db.relay.clone(), &config, PathBuf::from(config.effective_git_data_path()), metrics.as_ref().and_then(|m| m.sync_metrics().cloned()), ); if config.sync_bootstrap_relay_url.is_some() { info!( "Starting proactive sync with bootstrap relay: {:?}", config.sync_bootstrap_relay_url ); } else { info!("Proactive sync enabled (will discover relays from stored announcements)"); } // Re-queue all restored purgatory repos for sync let restored_identifiers = purgatory.get_all_identifiers(); if !restored_identifiers.is_empty() { info!( "Re-queueing {} restored repositories for sync", restored_identifiers.len() ); for identifier in restored_identifiers { purgatory.enqueue_sync_immediate(&identifier); } } // Get a reference to the rejected events index for shutdown persistence // and for the HTTP server's git push path (hot-cache re-processing) let shutdown_rejected_index = sync_manager.rejected_events_index(); let http_rejected_index = shutdown_rejected_index.clone(); tokio::spawn(async move { sync_manager.run().await; }); // Spawn background cleanup task for purgatory entries (60s interval) let cleanup_purgatory = purgatory.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(60)); loop { interval.tick().await; let (announcement_removed, state_removed, pr_removed) = cleanup_purgatory.cleanup(); if announcement_removed > 0 || state_removed > 0 || pr_removed > 0 { info!( "Purgatory cleanup: removed {} announcements, {} state events, {} PR events", announcement_removed, state_removed, pr_removed ); } } }); info!("Purgatory cleanup task started (60s interval)"); // Spawn daily cleanup task for old expired event records (prevent unbounded growth) let expired_cleanup_purgatory = purgatory.clone(); tokio::spawn(async move { // Run immediately on startup, then every 24 hours let mut interval = tokio::time::interval(Duration::from_secs(24 * 3600)); loop { interval.tick().await; // Remove expired event records older than 7 days let removed = expired_cleanup_purgatory .cleanup_expired_events(Duration::from_secs(7 * 24 * 3600)); if removed > 0 { info!( "Expired event cleanup: removed {} old expired event records (>7 days)", removed ); } } }); info!("Expired event cleanup task started (24h interval, keeps 7 days)"); // Spawn audit event cleanup task (30m interval, removes events >2h old) let audit_db = relay_with_db.database.clone(); let audit_git_path = PathBuf::from(config.effective_git_data_path()); tokio::spawn(async move { audit_cleanup::run_audit_cleanup_loop(audit_db, audit_git_path).await; }); info!("Audit event cleanup task started (30m interval, removes events >2h old)"); // Start purgatory sync loop for background git data fetching // Create naughty list tracker for git remote domains with persistent errors (12h expiration) let git_naughty_list = Arc::new(NaughtyListTracker::with_defaults()); let sync_ctx = Arc::new(RealSyncContext::new( purgatory.clone(), relay_with_db.database.clone(), PathBuf::from(config.effective_git_data_path()), Some(config.domain.clone()), Some(relay_with_db.relay.clone()), git_naughty_list.clone(), )); // Create throttle manager for rate limiting remote git servers // Default: 5 concurrent requests per domain, 60 requests per minute per domain let throttle_manager = Arc::new(ThrottleManager::new(5, 60)); throttle_manager.set_context(sync_ctx.clone()); throttle_manager.set_git_naughty_list(git_naughty_list.clone()); // Start the sync loop let _sync_loop_handle = purgatory .clone() .start_sync_loop(sync_ctx, throttle_manager, git_naughty_list.clone()); info!("Purgatory sync loop started (1s interval)"); // Setup shutdown handler for purgatory cleanup let shutdown_purgatory = purgatory.clone(); let git_data_path = config.effective_git_data_path(); // Start HTTP server with integrated relay and database info!("Starting HTTP server on {}", config.bind_address); // Wrap write_policy in Arc for sharing between HTTP server connections let http_write_policy = Arc::new(relay_with_db.write_policy.clone()); // Run server until shutdown signal, then cleanup #[cfg(unix)] { use tokio::signal::unix::{signal, SignalKind}; let mut sigterm = signal(SignalKind::terminate())?; tokio::select! { result = http::run_server( config, relay_with_db.relay, relay_with_db.database, metrics, purgatory, http_write_policy, http_rejected_index, ) => { result? } _ = signal::ctrl_c() => { info!("Received SIGINT (Ctrl+C), cleaning up..."); } _ = sigterm.recv() => { info!("Received SIGTERM, cleaning up..."); } } } #[cfg(not(unix))] { tokio::select! { result = http::run_server( config, relay_with_db.relay, relay_with_db.database, metrics, purgatory, http_write_policy, http_rejected_index, ) => { result? } _ = signal::ctrl_c() => { info!("Received SIGINT (Ctrl+C), cleaning up..."); } } } // Save purgatory state to disk let purgatory_save_path = PathBuf::from(&git_data_path).join("purgatory-state.json"); if let Err(e) = shutdown_purgatory.save_to_disk(&purgatory_save_path) { error!("Failed to save purgatory state: {}", e); } else { info!("Purgatory state saved to disk"); } // Save rejected events cache to disk let rejected_cache_path = PathBuf::from(&git_data_path).join("rejected-events-cache.json"); if let Err(e) = shutdown_rejected_index.save_to_disk(&rejected_cache_path) { error!("Failed to save rejected events cache: {}", e); } else { info!("Rejected events cache saved to disk"); } // Cleanup placeholder refs on shutdown let placeholder_ids = shutdown_purgatory.get_placeholder_event_ids(); if !placeholder_ids.is_empty() { info!( "Cleaning up {} placeholder refs/nostr/ refs on shutdown", placeholder_ids.len() ); git::cleanup_placeholder_refs(&git_data_path, &placeholder_ids); } } Ok(()) }