diff options
| author | Your Name <you@example.com> | 2026-05-26 16:11:05 +0530 |
|---|---|---|
| committer | Your Name <you@example.com> | 2026-05-26 16:11:05 +0530 |
| commit | 8816a192c95cf539b65975469a2d61aed46f0414 (patch) | |
| tree | 7590318244a56fabbfa6919ef6d0fab5be529134 /src | |
feat: initial implementation of grasp-mirror daemon
GRASP mirror daemon that discovers repos from watched npubs and mirrors
git data + Nostr events across all known GRASP servers for redundancy.
Features:
- Configurable npub watch list via .env (MIRROR_NPUBS)
- TOML config for GRASP server list, index relays, storage paths
- NIP-11 verification of GRASP servers on startup
- Discovery of repos via kind:30617 announcements on index relays
- Git mirroring (bare clone + push --mirror) to missing GRASP servers
- Nostr event forwarding to all GRASP server embedded relays
- SQLite state tracking for sync status and event dedup
- Optional signing key for updating announcements with new clone URLs
- CLI subcommands: daemon, status, verify, mirror-once
Architecture:
config.rs - TOML + .env config loading
db.rs - SQLite state tracking
health.rs - NIP-11 GRASP server verification
discovery.rs - Relay subscription, kind:30617 parsing
git_mirror.rs - Bare clone + push to GRASP servers
nostr_mirror.rs - Event forwarding to all GRASP relays
signing.rs - Optional announcement updates
main.rs - CLI entry point, daemon loop
Diffstat (limited to 'src')
| -rw-r--r-- | src/config.rs | 129 | ||||
| -rw-r--r-- | src/db.rs | 268 | ||||
| -rw-r--r-- | src/discovery.rs | 142 | ||||
| -rw-r--r-- | src/git_mirror.rs | 159 | ||||
| -rw-r--r-- | src/health.rs | 136 | ||||
| -rw-r--r-- | src/main.rs | 279 | ||||
| -rw-r--r-- | src/nostr_mirror.rs | 139 | ||||
| -rw-r--r-- | src/signing.rs | 76 |
8 files changed, 1328 insertions, 0 deletions
diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..ceff44d --- /dev/null +++ b/src/config.rs | |||
| @@ -0,0 +1,129 @@ | |||
| 1 | use anyhow::{Context, Result}; | ||
| 2 | use nostr::{FromBech32, PublicKey, ToBech32}; | ||
| 3 | use serde::Deserialize; | ||
| 4 | use std::path::PathBuf; | ||
| 5 | |||
| 6 | #[derive(Debug, Deserialize)] | ||
| 7 | pub struct AppConfig { | ||
| 8 | pub discovery: DiscoveryConfig, | ||
| 9 | pub servers: ServersConfig, | ||
| 10 | pub storage: StorageConfig, | ||
| 11 | pub signing: Option<SigningConfig>, | ||
| 12 | } | ||
| 13 | |||
| 14 | #[derive(Debug, Deserialize)] | ||
| 15 | pub struct DiscoveryConfig { | ||
| 16 | pub index_relays: Vec<String>, | ||
| 17 | #[serde(default = "default_poll_interval")] | ||
| 18 | pub poll_interval_secs: u64, | ||
| 19 | } | ||
| 20 | |||
| 21 | fn default_poll_interval() -> u64 { | ||
| 22 | 300 | ||
| 23 | } | ||
| 24 | |||
| 25 | #[derive(Debug, Deserialize)] | ||
| 26 | pub struct ServersConfig { | ||
| 27 | pub known: Vec<String>, | ||
| 28 | } | ||
| 29 | |||
| 30 | #[derive(Debug, Deserialize)] | ||
| 31 | pub struct StorageConfig { | ||
| 32 | #[serde(default = "default_mirror_dir")] | ||
| 33 | pub mirror_dir: PathBuf, | ||
| 34 | #[serde(default = "default_database")] | ||
| 35 | pub database: PathBuf, | ||
| 36 | } | ||
| 37 | |||
| 38 | fn default_mirror_dir() -> PathBuf { | ||
| 39 | dirs::data_dir() | ||
| 40 | .unwrap_or_else(|| PathBuf::from(".")) | ||
| 41 | .join("grasp-mirror") | ||
| 42 | .join("repos") | ||
| 43 | } | ||
| 44 | |||
| 45 | fn default_database() -> PathBuf { | ||
| 46 | dirs::data_dir() | ||
| 47 | .unwrap_or_else(|| PathBuf::from(".")) | ||
| 48 | .join("grasp-mirror") | ||
| 49 | .join("mirror.db") | ||
| 50 | } | ||
| 51 | |||
| 52 | #[derive(Debug, Deserialize)] | ||
| 53 | pub struct SigningConfig { | ||
| 54 | pub key_file: PathBuf, | ||
| 55 | } | ||
| 56 | |||
| 57 | pub struct ResolvedConfig { | ||
| 58 | pub discovery: DiscoveryConfig, | ||
| 59 | pub servers: ServersConfig, | ||
| 60 | pub storage: StorageConfig, | ||
| 61 | pub signing: Option<SigningConfig>, | ||
| 62 | pub npubs: Vec<PublicKey>, | ||
| 63 | } | ||
| 64 | |||
| 65 | impl ResolvedConfig { | ||
| 66 | pub fn load(config_path: &PathBuf) -> Result<Self> { | ||
| 67 | let _ = dotenvy::dotenv(); | ||
| 68 | |||
| 69 | let config_str = std::fs::read_to_string(config_path) | ||
| 70 | .with_context(|| format!("failed to read config from {:?}", config_path))?; | ||
| 71 | let app: AppConfig = toml::from_str(&config_str).context("failed to parse config.toml")?; | ||
| 72 | |||
| 73 | let npubs = Self::load_npubs()?; | ||
| 74 | |||
| 75 | std::fs::create_dir_all(&app.storage.mirror_dir) | ||
| 76 | .context("failed to create mirror directory")?; | ||
| 77 | if let Some(parent) = app.storage.database.parent() { | ||
| 78 | std::fs::create_dir_all(parent).context("failed to create database directory")?; | ||
| 79 | } | ||
| 80 | |||
| 81 | Ok(Self { | ||
| 82 | discovery: app.discovery, | ||
| 83 | servers: app.servers, | ||
| 84 | storage: app.storage, | ||
| 85 | signing: app.signing, | ||
| 86 | npubs, | ||
| 87 | }) | ||
| 88 | } | ||
| 89 | |||
| 90 | fn load_npubs() -> Result<Vec<PublicKey>> { | ||
| 91 | let raw = std::env::var("MIRROR_NPUBS").unwrap_or_default(); | ||
| 92 | let mut npubs = Vec::new(); | ||
| 93 | |||
| 94 | for entry in raw.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()) { | ||
| 95 | let pk = if entry.starts_with("npub1") { | ||
| 96 | PublicKey::from_bech32(entry) | ||
| 97 | .with_context(|| format!("invalid npub: {}", entry))? | ||
| 98 | } else { | ||
| 99 | let bytes = hex::decode(entry) | ||
| 100 | .with_context(|| format!("invalid hex pubkey: {}", entry))?; | ||
| 101 | PublicKey::from_slice(&bytes) | ||
| 102 | .with_context(|| format!("invalid pubkey bytes: {}", entry))? | ||
| 103 | }; | ||
| 104 | npubs.push(pk); | ||
| 105 | } | ||
| 106 | |||
| 107 | if npubs.is_empty() { | ||
| 108 | tracing::warn!("MIRROR_NPUBS is empty — no pubkeys to mirror"); | ||
| 109 | } | ||
| 110 | |||
| 111 | Ok(npubs) | ||
| 112 | } | ||
| 113 | |||
| 114 | pub fn relay_urls(&self) -> Vec<String> { | ||
| 115 | let mut relays = self.discovery.index_relays.clone(); | ||
| 116 | for server in &self.servers.known { | ||
| 117 | let relay = format!( | ||
| 118 | "wss://{}", | ||
| 119 | server | ||
| 120 | .trim_start_matches("https://") | ||
| 121 | .trim_start_matches("http://") | ||
| 122 | ); | ||
| 123 | if !relays.contains(&relay) { | ||
| 124 | relays.push(relay); | ||
| 125 | } | ||
| 126 | } | ||
| 127 | relays | ||
| 128 | } | ||
| 129 | } | ||
diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..bb1bf31 --- /dev/null +++ b/src/db.rs | |||
| @@ -0,0 +1,268 @@ | |||
| 1 | use anyhow::{Context, Result}; | ||
| 2 | use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; | ||
| 3 | use sqlx::SqlitePool; | ||
| 4 | use std::path::Path; | ||
| 5 | |||
| 6 | pub struct MirrorDb { | ||
| 7 | pool: SqlitePool, | ||
| 8 | } | ||
| 9 | |||
| 10 | #[derive(Debug, sqlx::FromRow)] | ||
| 11 | pub struct RepoRecord { | ||
| 12 | pub id: i64, | ||
| 13 | pub pubkey: String, | ||
| 14 | pub identifier: String, | ||
| 15 | pub announcement_event_id: Option<String>, | ||
| 16 | pub last_seen_at: i64, | ||
| 17 | } | ||
| 18 | |||
| 19 | #[derive(Debug, sqlx::FromRow)] | ||
| 20 | pub struct ServerSyncRecord { | ||
| 21 | pub id: i64, | ||
| 22 | pub repo_id: i64, | ||
| 23 | pub server_domain: String, | ||
| 24 | pub git_synced: bool, | ||
| 25 | pub nostr_synced: bool, | ||
| 26 | pub last_sync_at: Option<i64>, | ||
| 27 | pub error: Option<String>, | ||
| 28 | } | ||
| 29 | |||
| 30 | #[derive(Debug, sqlx::FromRow)] | ||
| 31 | pub struct EventRecord { | ||
| 32 | pub id: i64, | ||
| 33 | pub event_id: String, | ||
| 34 | pub first_seen_at: i64, | ||
| 35 | } | ||
| 36 | |||
| 37 | impl MirrorDb { | ||
| 38 | pub async fn open(db_path: &Path) -> Result<Self> { | ||
| 39 | let opts = SqliteConnectOptions::new() | ||
| 40 | .filename(db_path) | ||
| 41 | .create_if_missing(true) | ||
| 42 | .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); | ||
| 43 | |||
| 44 | let pool = SqlitePoolOptions::new() | ||
| 45 | .max_connections(5) | ||
| 46 | .connect_with(opts) | ||
| 47 | .await | ||
| 48 | .with_context(|| format!("failed to open database at {:?}", db_path))?; | ||
| 49 | |||
| 50 | let db = Self { pool }; | ||
| 51 | db.run_migrations().await?; | ||
| 52 | Ok(db) | ||
| 53 | } | ||
| 54 | |||
| 55 | async fn run_migrations(&self) -> Result<()> { | ||
| 56 | sqlx::query( | ||
| 57 | r#" | ||
| 58 | CREATE TABLE IF NOT EXISTS repos ( | ||
| 59 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 60 | pubkey TEXT NOT NULL, | ||
| 61 | identifier TEXT NOT NULL, | ||
| 62 | announcement_event_id TEXT, | ||
| 63 | last_seen_at INTEGER NOT NULL, | ||
| 64 | UNIQUE(pubkey, identifier) | ||
| 65 | ); | ||
| 66 | |||
| 67 | CREATE TABLE IF NOT EXISTS server_syncs ( | ||
| 68 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 69 | repo_id INTEGER NOT NULL REFERENCES repos(id), | ||
| 70 | server_domain TEXT NOT NULL, | ||
| 71 | git_synced INTEGER NOT NULL DEFAULT 0, | ||
| 72 | nostr_synced INTEGER NOT NULL DEFAULT 0, | ||
| 73 | last_sync_at INTEGER, | ||
| 74 | error TEXT, | ||
| 75 | UNIQUE(repo_id, server_domain) | ||
| 76 | ); | ||
| 77 | |||
| 78 | CREATE TABLE IF NOT EXISTS seen_events ( | ||
| 79 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 80 | event_id TEXT NOT NULL UNIQUE, | ||
| 81 | first_seen_at INTEGER NOT NULL | ||
| 82 | ); | ||
| 83 | |||
| 84 | CREATE INDEX IF NOT EXISTS idx_repos_pubkey ON repos(pubkey); | ||
| 85 | CREATE INDEX IF NOT EXISTS idx_server_syncs_repo ON server_syncs(repo_id); | ||
| 86 | CREATE INDEX IF NOT EXISTS idx_seen_events_id ON seen_events(event_id); | ||
| 87 | "#, | ||
| 88 | ) | ||
| 89 | .execute(&self.pool) | ||
| 90 | .await | ||
| 91 | .context("failed to run migrations")?; | ||
| 92 | |||
| 93 | Ok(()) | ||
| 94 | } | ||
| 95 | |||
| 96 | pub async fn upsert_repo( | ||
| 97 | &self, | ||
| 98 | pubkey: &str, | ||
| 99 | identifier: &str, | ||
| 100 | event_id: &str, | ||
| 101 | ) -> Result<i64> { | ||
| 102 | let now = chrono_now_secs(); | ||
| 103 | let result = sqlx::query_as::<_, RepoRecord>( | ||
| 104 | "SELECT * FROM repos WHERE pubkey = ? AND identifier = ?", | ||
| 105 | ) | ||
| 106 | .bind(pubkey) | ||
| 107 | .bind(identifier) | ||
| 108 | .fetch_optional(&self.pool) | ||
| 109 | .await?; | ||
| 110 | |||
| 111 | if let Some(existing) = result { | ||
| 112 | sqlx::query("UPDATE repos SET announcement_event_id = ?, last_seen_at = ? WHERE id = ?") | ||
| 113 | .bind(event_id) | ||
| 114 | .bind(now) | ||
| 115 | .bind(existing.id) | ||
| 116 | .execute(&self.pool) | ||
| 117 | .await?; | ||
| 118 | Ok(existing.id) | ||
| 119 | } else { | ||
| 120 | sqlx::query("INSERT INTO repos (pubkey, identifier, announcement_event_id, last_seen_at) VALUES (?, ?, ?, ?)") | ||
| 121 | .bind(pubkey) | ||
| 122 | .bind(identifier) | ||
| 123 | .bind(event_id) | ||
| 124 | .bind(now) | ||
| 125 | .execute(&self.pool) | ||
| 126 | .await?; | ||
| 127 | |||
| 128 | let row: (i64,) = sqlx::query_as("SELECT last_insert_rowid()") | ||
| 129 | .fetch_one(&self.pool) | ||
| 130 | .await?; | ||
| 131 | Ok(row.0) | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | pub async fn get_repos_needing_git_sync(&self, known_servers: &[String]) -> Result<Vec<(RepoRecord, Vec<String>)>> { | ||
| 136 | let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos") | ||
| 137 | .fetch_all(&self.pool) | ||
| 138 | .await?; | ||
| 139 | |||
| 140 | let mut result = Vec::new(); | ||
| 141 | for repo in repos { | ||
| 142 | let synced: Vec<String> = sqlx::query_scalar::<_, String>( | ||
| 143 | "SELECT server_domain FROM server_syncs WHERE repo_id = ? AND git_synced = 1", | ||
| 144 | ) | ||
| 145 | .bind(repo.id) | ||
| 146 | .fetch_all(&self.pool) | ||
| 147 | .await?; | ||
| 148 | |||
| 149 | let missing: Vec<String> = known_servers | ||
| 150 | .iter() | ||
| 151 | .filter(|s| !synced.contains(s)) | ||
| 152 | .cloned() | ||
| 153 | .collect(); | ||
| 154 | |||
| 155 | if !missing.is_empty() { | ||
| 156 | result.push((repo, missing)); | ||
| 157 | } | ||
| 158 | } | ||
| 159 | |||
| 160 | Ok(result) | ||
| 161 | } | ||
| 162 | |||
| 163 | pub async fn mark_git_synced( | ||
| 164 | &self, | ||
| 165 | repo_id: i64, | ||
| 166 | server_domain: &str, | ||
| 167 | ) -> Result<()> { | ||
| 168 | let now = chrono_now_secs(); | ||
| 169 | sqlx::query( | ||
| 170 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) | ||
| 171 | VALUES (?, ?, 1, 0, ?) | ||
| 172 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET git_synced = 1, last_sync_at = ?, error = NULL"#, | ||
| 173 | ) | ||
| 174 | .bind(repo_id) | ||
| 175 | .bind(server_domain) | ||
| 176 | .bind(now) | ||
| 177 | .bind(now) | ||
| 178 | .execute(&self.pool) | ||
| 179 | .await?; | ||
| 180 | Ok(()) | ||
| 181 | } | ||
| 182 | |||
| 183 | pub async fn mark_nostr_synced( | ||
| 184 | &self, | ||
| 185 | repo_id: i64, | ||
| 186 | server_domain: &str, | ||
| 187 | ) -> Result<()> { | ||
| 188 | let now = chrono_now_secs(); | ||
| 189 | sqlx::query( | ||
| 190 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) | ||
| 191 | VALUES (?, ?, 0, 1, ?) | ||
| 192 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET nostr_synced = 1, last_sync_at = ?, error = NULL"#, | ||
| 193 | ) | ||
| 194 | .bind(repo_id) | ||
| 195 | .bind(server_domain) | ||
| 196 | .bind(now) | ||
| 197 | .bind(now) | ||
| 198 | .execute(&self.pool) | ||
| 199 | .await?; | ||
| 200 | Ok(()) | ||
| 201 | } | ||
| 202 | |||
| 203 | pub async fn mark_sync_error( | ||
| 204 | &self, | ||
| 205 | repo_id: i64, | ||
| 206 | server_domain: &str, | ||
| 207 | error: &str, | ||
| 208 | ) -> Result<()> { | ||
| 209 | let now = chrono_now_secs(); | ||
| 210 | sqlx::query( | ||
| 211 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at, error) | ||
| 212 | VALUES (?, ?, 0, 0, ?, ?) | ||
| 213 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET error = ?, last_sync_at = ?"#, | ||
| 214 | ) | ||
| 215 | .bind(repo_id) | ||
| 216 | .bind(server_domain) | ||
| 217 | .bind(now) | ||
| 218 | .bind(error) | ||
| 219 | .bind(error) | ||
| 220 | .bind(now) | ||
| 221 | .execute(&self.pool) | ||
| 222 | .await?; | ||
| 223 | Ok(()) | ||
| 224 | } | ||
| 225 | |||
| 226 | pub async fn have_seen_event(&self, event_id: &str) -> Result<bool> { | ||
| 227 | let result: Option<(i64,)> = sqlx::query_as( | ||
| 228 | "SELECT id FROM seen_events WHERE event_id = ?", | ||
| 229 | ) | ||
| 230 | .bind(event_id) | ||
| 231 | .fetch_optional(&self.pool) | ||
| 232 | .await?; | ||
| 233 | Ok(result.is_some()) | ||
| 234 | } | ||
| 235 | |||
| 236 | pub async fn record_event(&self, event_id: &str) -> Result<()> { | ||
| 237 | let now = chrono_now_secs(); | ||
| 238 | sqlx::query("INSERT OR IGNORE INTO seen_events (event_id, first_seen_at) VALUES (?, ?)") | ||
| 239 | .bind(event_id) | ||
| 240 | .bind(now) | ||
| 241 | .execute(&self.pool) | ||
| 242 | .await?; | ||
| 243 | Ok(()) | ||
| 244 | } | ||
| 245 | |||
| 246 | pub async fn get_all_repos(&self) -> Result<Vec<RepoRecord>> { | ||
| 247 | let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos ORDER BY last_seen_at DESC") | ||
| 248 | .fetch_all(&self.pool) | ||
| 249 | .await?; | ||
| 250 | Ok(repos) | ||
| 251 | } | ||
| 252 | |||
| 253 | pub async fn get_sync_summary(&self) -> Result<Vec<ServerSyncRecord>> { | ||
| 254 | let records = sqlx::query_as::<_, ServerSyncRecord>( | ||
| 255 | "SELECT * FROM server_syncs ORDER BY last_sync_at DESC NULLS LAST", | ||
| 256 | ) | ||
| 257 | .fetch_all(&self.pool) | ||
| 258 | .await?; | ||
| 259 | Ok(records) | ||
| 260 | } | ||
| 261 | } | ||
| 262 | |||
| 263 | fn chrono_now_secs() -> i64 { | ||
| 264 | std::time::SystemTime::now() | ||
| 265 | .duration_since(std::time::UNIX_EPOCH) | ||
| 266 | .unwrap_or_default() | ||
| 267 | .as_secs() as i64 | ||
| 268 | } | ||
diff --git a/src/discovery.rs b/src/discovery.rs new file mode 100644 index 0000000..59b9f2c --- /dev/null +++ b/src/discovery.rs | |||
| @@ -0,0 +1,142 @@ | |||
| 1 | use crate::db::MirrorDb; | ||
| 2 | use crate::health::GraspServer; | ||
| 3 | use anyhow::{Context, Result}; | ||
| 4 | use nostr::Kind; | ||
| 5 | use nostr_sdk::prelude::*; | ||
| 6 | use std::collections::HashMap; | ||
| 7 | |||
| 8 | #[derive(Debug, Clone)] | ||
| 9 | pub struct DiscoveredRepo { | ||
| 10 | pub pubkey: PublicKey, | ||
| 11 | pub identifier: String, | ||
| 12 | pub event_id: EventId, | ||
| 13 | pub clone_urls: Vec<String>, | ||
| 14 | pub relay_urls: Vec<String>, | ||
| 15 | } | ||
| 16 | |||
| 17 | pub async fn discover_repos_from_relays( | ||
| 18 | client: &nostr_sdk::Client, | ||
| 19 | npubs: &[PublicKey], | ||
| 20 | relay_urls: &[String], | ||
| 21 | ) -> Result<Vec<DiscoveredRepo>> { | ||
| 22 | for url in relay_urls { | ||
| 23 | if let Err(e) = client.add_relay(url).await { | ||
| 24 | tracing::warn!(relay = %url, error = %e, "failed to add relay"); | ||
| 25 | continue; | ||
| 26 | } | ||
| 27 | } | ||
| 28 | client.connect().await; | ||
| 29 | |||
| 30 | let mut repos = Vec::new(); | ||
| 31 | |||
| 32 | for pk in npubs { | ||
| 33 | let filter = Filter::new() | ||
| 34 | .kind(Kind::Custom(30617)) | ||
| 35 | .author(*pk) | ||
| 36 | .limit(50); | ||
| 37 | |||
| 38 | let events = client | ||
| 39 | .fetch_events(filter, std::time::Duration::from_secs(30)) | ||
| 40 | .await | ||
| 41 | .context("failed to fetch events from relays")?; | ||
| 42 | |||
| 43 | for event in events.into_iter() { | ||
| 44 | if let Some(repo) = parse_announcement(&event) { | ||
| 45 | repos.push(repo); | ||
| 46 | } | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | tracing::info!(count = repos.len(), "discovered repos from relays"); | ||
| 51 | Ok(repos) | ||
| 52 | } | ||
| 53 | |||
| 54 | fn parse_announcement(event: &Event) -> Option<DiscoveredRepo> { | ||
| 55 | let mut identifier = None; | ||
| 56 | let mut clone_urls = Vec::new(); | ||
| 57 | let mut relay_urls = Vec::new(); | ||
| 58 | |||
| 59 | for tag in event.tags.iter() { | ||
| 60 | let tag_vec = tag.clone().to_vec(); | ||
| 61 | if tag_vec.len() < 2 { | ||
| 62 | continue; | ||
| 63 | } | ||
| 64 | match tag_vec[0].as_str() { | ||
| 65 | "d" => identifier = Some(tag_vec[1].clone()), | ||
| 66 | "clone" => clone_urls.push(tag_vec[1].clone()), | ||
| 67 | "relays" => relay_urls.push(tag_vec[1].clone()), | ||
| 68 | _ => {} | ||
| 69 | } | ||
| 70 | } | ||
| 71 | |||
| 72 | let identifier = identifier?; | ||
| 73 | if clone_urls.is_empty() { | ||
| 74 | tracing::warn!( | ||
| 75 | identifier = %identifier, | ||
| 76 | "repo announcement has no clone URLs" | ||
| 77 | ); | ||
| 78 | } | ||
| 79 | |||
| 80 | Some(DiscoveredRepo { | ||
| 81 | pubkey: event.pubkey, | ||
| 82 | identifier, | ||
| 83 | event_id: event.id, | ||
| 84 | clone_urls, | ||
| 85 | relay_urls, | ||
| 86 | }) | ||
| 87 | } | ||
| 88 | |||
| 89 | pub async fn persist_discovered_repos( | ||
| 90 | db: &MirrorDb, | ||
| 91 | repos: &[DiscoveredRepo], | ||
| 92 | ) -> Result<HashMap<String, i64>> { | ||
| 93 | let mut repo_ids = HashMap::new(); | ||
| 94 | for repo in repos { | ||
| 95 | let pk_hex = repo.pubkey.to_hex(); | ||
| 96 | match db | ||
| 97 | .upsert_repo(&pk_hex, &repo.identifier, &repo.event_id.to_hex()) | ||
| 98 | .await | ||
| 99 | { | ||
| 100 | Ok(id) => { | ||
| 101 | tracing::debug!( | ||
| 102 | identifier = %repo.identifier, | ||
| 103 | repo_id = id, | ||
| 104 | "persisted repo" | ||
| 105 | ); | ||
| 106 | repo_ids.insert(format!("{}:{}", pk_hex, repo.identifier), id); | ||
| 107 | } | ||
| 108 | Err(e) => { | ||
| 109 | tracing::error!( | ||
| 110 | identifier = %repo.identifier, | ||
| 111 | error = %e, | ||
| 112 | "failed to persist repo" | ||
| 113 | ); | ||
| 114 | } | ||
| 115 | } | ||
| 116 | } | ||
| 117 | Ok(repo_ids) | ||
| 118 | } | ||
| 119 | |||
| 120 | pub fn identify_missing_servers( | ||
| 121 | repo: &DiscoveredRepo, | ||
| 122 | servers: &HashMap<String, GraspServer>, | ||
| 123 | ) -> Vec<GraspServer> { | ||
| 124 | let mut missing = Vec::new(); | ||
| 125 | |||
| 126 | for (_domain, server) in servers { | ||
| 127 | if !server.is_grasp_server() { | ||
| 128 | continue; | ||
| 129 | } | ||
| 130 | |||
| 131 | let has_clone = repo | ||
| 132 | .clone_urls | ||
| 133 | .iter() | ||
| 134 | .any(|url| url.contains(&server.domain)); | ||
| 135 | |||
| 136 | if !has_clone { | ||
| 137 | missing.push(server.clone()); | ||
| 138 | } | ||
| 139 | } | ||
| 140 | |||
| 141 | missing | ||
| 142 | } | ||
diff --git a/src/git_mirror.rs b/src/git_mirror.rs new file mode 100644 index 0000000..47c0442 --- /dev/null +++ b/src/git_mirror.rs | |||
| @@ -0,0 +1,159 @@ | |||
| 1 | use crate::db::MirrorDb; | ||
| 2 | use crate::discovery::DiscoveredRepo; | ||
| 3 | use crate::health::GraspServer; | ||
| 4 | use anyhow::{Context, Result}; | ||
| 5 | use git2::RemoteCallbacks; | ||
| 6 | use std::path::Path; | ||
| 7 | |||
| 8 | pub struct GitMirror { | ||
| 9 | mirror_dir: std::path::PathBuf, | ||
| 10 | } | ||
| 11 | |||
| 12 | impl GitMirror { | ||
| 13 | pub fn new(mirror_dir: &Path) -> Self { | ||
| 14 | Self { | ||
| 15 | mirror_dir: mirror_dir.to_path_buf(), | ||
| 16 | } | ||
| 17 | } | ||
| 18 | |||
| 19 | fn repo_path(&self, pubkey: &str, identifier: &str) -> std::path::PathBuf { | ||
| 20 | self.mirror_dir | ||
| 21 | .join(pubkey) | ||
| 22 | .join(format!("{}.git", identifier)) | ||
| 23 | } | ||
| 24 | |||
| 25 | pub async fn mirror_repo_to_servers( | ||
| 26 | &self, | ||
| 27 | db: &MirrorDb, | ||
| 28 | repo: &DiscoveredRepo, | ||
| 29 | target_servers: &[GraspServer], | ||
| 30 | ) -> Result<()> { | ||
| 31 | if target_servers.is_empty() { | ||
| 32 | tracing::debug!( | ||
| 33 | identifier = %repo.identifier, | ||
| 34 | "no missing servers to mirror to" | ||
| 35 | ); | ||
| 36 | return Ok(()); | ||
| 37 | } | ||
| 38 | |||
| 39 | let pk_hex = repo.pubkey.to_hex(); | ||
| 40 | let repo_path = self.repo_path(&pk_hex, &repo.identifier); | ||
| 41 | |||
| 42 | if !repo_path.exists() { | ||
| 43 | self.clone_bare(&repo_path, &repo.clone_urls)?; | ||
| 44 | } | ||
| 45 | |||
| 46 | for server in target_servers { | ||
| 47 | let target_url = server.clone_url(&pk_hex, &repo.identifier); | ||
| 48 | |||
| 49 | tracing::info!( | ||
| 50 | identifier = %repo.identifier, | ||
| 51 | server = %server.domain, | ||
| 52 | target = %target_url, | ||
| 53 | "mirroring git data" | ||
| 54 | ); | ||
| 55 | |||
| 56 | let repo_id = db.get_all_repos().await.ok().and_then(|repos| { | ||
| 57 | repos | ||
| 58 | .iter() | ||
| 59 | .find(|r| r.pubkey == pk_hex && r.identifier == repo.identifier) | ||
| 60 | .map(|r| r.id) | ||
| 61 | }); | ||
| 62 | |||
| 63 | match self.push_mirror(&repo_path, &target_url) { | ||
| 64 | Ok(()) => { | ||
| 65 | tracing::info!( | ||
| 66 | identifier = %repo.identifier, | ||
| 67 | server = %server.domain, | ||
| 68 | "git mirror succeeded" | ||
| 69 | ); | ||
| 70 | if let Some(id) = repo_id { | ||
| 71 | let _ = db.mark_git_synced(id, &server.domain).await; | ||
| 72 | } | ||
| 73 | } | ||
| 74 | Err(e) => { | ||
| 75 | tracing::error!( | ||
| 76 | identifier = %repo.identifier, | ||
| 77 | server = %server.domain, | ||
| 78 | error = %e, | ||
| 79 | "git mirror failed" | ||
| 80 | ); | ||
| 81 | if let Some(id) = repo_id { | ||
| 82 | let _ = db.mark_sync_error(id, &server.domain, &e.to_string()).await; | ||
| 83 | } | ||
| 84 | } | ||
| 85 | } | ||
| 86 | } | ||
| 87 | |||
| 88 | Ok(()) | ||
| 89 | } | ||
| 90 | |||
| 91 | fn clone_bare(&self, repo_path: &Path, clone_urls: &[String]) -> Result<()> { | ||
| 92 | if let Some(parent) = repo_path.parent() { | ||
| 93 | std::fs::create_dir_all(parent) | ||
| 94 | .with_context(|| format!("failed to create {:?}", parent))?; | ||
| 95 | } | ||
| 96 | |||
| 97 | let mut last_error = None; | ||
| 98 | |||
| 99 | for url in clone_urls { | ||
| 100 | if url.is_empty() { | ||
| 101 | continue; | ||
| 102 | } | ||
| 103 | tracing::info!(url = %url, path = ?repo_path, "cloning bare repo"); | ||
| 104 | |||
| 105 | let mut callbacks = RemoteCallbacks::new(); | ||
| 106 | callbacks.credentials(|_url, _username, _allowed| git2::Cred::default()); | ||
| 107 | |||
| 108 | let mut fetch_opts = git2::FetchOptions::new(); | ||
| 109 | fetch_opts.remote_callbacks(callbacks); | ||
| 110 | |||
| 111 | let mut builder = git2::build::RepoBuilder::new(); | ||
| 112 | builder.bare(true).fetch_options(fetch_opts); | ||
| 113 | |||
| 114 | match builder.clone(url, repo_path) { | ||
| 115 | Ok(_) => { | ||
| 116 | tracing::info!(url = %url, "bare clone succeeded"); | ||
| 117 | return Ok(()); | ||
| 118 | } | ||
| 119 | Err(e) => { | ||
| 120 | tracing::warn!(url = %url, error = %e, "clone failed, trying next URL"); | ||
| 121 | last_error = Some(e); | ||
| 122 | if repo_path.exists() { | ||
| 123 | let _ = std::fs::remove_dir_all(repo_path); | ||
| 124 | } | ||
| 125 | } | ||
| 126 | } | ||
| 127 | } | ||
| 128 | |||
| 129 | let err = last_error.unwrap_or_else(|| git2::Error::from_str("no clone URLs available")); | ||
| 130 | Err(err).with_context(|| format!("all clone attempts failed for {:?}", repo_path)) | ||
| 131 | } | ||
| 132 | |||
| 133 | fn push_mirror(&self, repo_path: &Path, target_url: &str) -> Result<()> { | ||
| 134 | let repo = git2::Repository::open(repo_path) | ||
| 135 | .with_context(|| format!("failed to open bare repo at {:?}", repo_path))?; | ||
| 136 | |||
| 137 | let mut remote = repo.remote("push_target", target_url)?; | ||
| 138 | |||
| 139 | let mut callbacks = RemoteCallbacks::new(); | ||
| 140 | callbacks.credentials(|_url, _username, _allowed| git2::Cred::default()); | ||
| 141 | callbacks.push_update_reference(|_refname, status| { | ||
| 142 | if let Some(s) = status { | ||
| 143 | tracing::warn!(status = %s, "push rejected"); | ||
| 144 | } | ||
| 145 | Ok(()) | ||
| 146 | }); | ||
| 147 | |||
| 148 | let mut push_opts = git2::PushOptions::new(); | ||
| 149 | push_opts.remote_callbacks(callbacks); | ||
| 150 | |||
| 151 | let refspecs = ["+refs/*:refs/*"]; | ||
| 152 | |||
| 153 | remote | ||
| 154 | .push(&refspecs, Some(&mut push_opts)) | ||
| 155 | .with_context(|| format!("failed to push mirror to {}", target_url))?; | ||
| 156 | |||
| 157 | Ok(()) | ||
| 158 | } | ||
| 159 | } | ||
diff --git a/src/health.rs b/src/health.rs new file mode 100644 index 0000000..2853239 --- /dev/null +++ b/src/health.rs | |||
| @@ -0,0 +1,136 @@ | |||
| 1 | use anyhow::{Context, Result}; | ||
| 2 | use serde::{Deserialize, Serialize}; | ||
| 3 | use std::collections::HashMap; | ||
| 4 | |||
| 5 | #[derive(Debug, Clone, Serialize, Deserialize)] | ||
| 6 | pub struct Nip11Info { | ||
| 7 | pub name: Option<String>, | ||
| 8 | pub description: Option<String>, | ||
| 9 | pub supported_nips: Option<Vec<u64>>, | ||
| 10 | pub supported_grasps: Option<Vec<String>>, | ||
| 11 | pub software: Option<String>, | ||
| 12 | pub version: Option<String>, | ||
| 13 | } | ||
| 14 | |||
| 15 | #[derive(Debug, Clone)] | ||
| 16 | pub struct GraspServer { | ||
| 17 | pub domain: String, | ||
| 18 | pub relay_url: String, | ||
| 19 | pub clone_url_prefix: String, | ||
| 20 | pub nip11: Option<Nip11Info>, | ||
| 21 | pub healthy: bool, | ||
| 22 | } | ||
| 23 | |||
| 24 | impl GraspServer { | ||
| 25 | pub fn from_domain(domain: &str) -> Self { | ||
| 26 | let clean = domain | ||
| 27 | .trim_start_matches("https://") | ||
| 28 | .trim_start_matches("http://") | ||
| 29 | .trim_end_matches('/') | ||
| 30 | .to_string(); | ||
| 31 | Self { | ||
| 32 | relay_url: format!("wss://{}", clean), | ||
| 33 | clone_url_prefix: format!("https://{}", clean), | ||
| 34 | nip11: None, | ||
| 35 | healthy: false, | ||
| 36 | domain: clean, | ||
| 37 | } | ||
| 38 | } | ||
| 39 | |||
| 40 | pub fn clone_url(&self, npub_hex: &str, identifier: &str) -> String { | ||
| 41 | format!("{}/{}/{}.git", self.clone_url_prefix, npub_hex, identifier) | ||
| 42 | } | ||
| 43 | |||
| 44 | pub fn is_grasp_server(&self) -> bool { | ||
| 45 | self.nip11 | ||
| 46 | .as_ref() | ||
| 47 | .map(|info| info.supported_grasps.is_some()) | ||
| 48 | .unwrap_or(false) | ||
| 49 | } | ||
| 50 | } | ||
| 51 | |||
| 52 | pub async fn verify_grasp_server(domain: &str) -> Result<GraspServer> { | ||
| 53 | let mut server = GraspServer::from_domain(domain); | ||
| 54 | let nip11_url = format!("https://{}", server.domain); | ||
| 55 | |||
| 56 | let client = reqwest::Client::builder() | ||
| 57 | .timeout(std::time::Duration::from_secs(10)) | ||
| 58 | .build()?; | ||
| 59 | |||
| 60 | let resp = client | ||
| 61 | .get(&nip11_url) | ||
| 62 | .header("Accept", "application/nostr+json") | ||
| 63 | .send() | ||
| 64 | .await; | ||
| 65 | |||
| 66 | match resp { | ||
| 67 | Ok(resp) if resp.status().is_success() => { | ||
| 68 | match resp.json::<Nip11Info>().await { | ||
| 69 | Ok(info) => { | ||
| 70 | let is_grasp = info.supported_grasps.is_some(); | ||
| 71 | if is_grasp { | ||
| 72 | tracing::info!( | ||
| 73 | domain = %server.domain, | ||
| 74 | grasps = ?info.supported_grasps, | ||
| 75 | version = ?info.version, | ||
| 76 | "verified GRASP server" | ||
| 77 | ); | ||
| 78 | } else { | ||
| 79 | tracing::warn!( | ||
| 80 | domain = %server.domain, | ||
| 81 | "server responded to NIP-11 but has no supported_grasps" | ||
| 82 | ); | ||
| 83 | } | ||
| 84 | server.healthy = is_grasp; | ||
| 85 | server.nip11 = Some(info); | ||
| 86 | } | ||
| 87 | Err(e) => { | ||
| 88 | tracing::warn!(domain = %server.domain, error = %e, "failed to parse NIP-11 response"); | ||
| 89 | } | ||
| 90 | } | ||
| 91 | } | ||
| 92 | Ok(resp) => { | ||
| 93 | tracing::warn!( | ||
| 94 | domain = %server.domain, | ||
| 95 | status = %resp.status(), | ||
| 96 | "NIP-11 check returned non-success" | ||
| 97 | ); | ||
| 98 | } | ||
| 99 | Err(e) => { | ||
| 100 | tracing::warn!(domain = %server.domain, error = %e, "NIP-11 check failed"); | ||
| 101 | } | ||
| 102 | } | ||
| 103 | |||
| 104 | Ok(server) | ||
| 105 | } | ||
| 106 | |||
| 107 | pub async fn verify_all_servers(domains: &[String]) -> HashMap<String, GraspServer> { | ||
| 108 | let mut servers = HashMap::new(); | ||
| 109 | let mut set = tokio::task::JoinSet::new(); | ||
| 110 | |||
| 111 | for d in domains { | ||
| 112 | let domain = d.clone(); | ||
| 113 | set.spawn(async move { | ||
| 114 | let result = verify_grasp_server(&domain).await; | ||
| 115 | (domain, result) | ||
| 116 | }); | ||
| 117 | } | ||
| 118 | |||
| 119 | while let Some(res) = set.join_next().await { | ||
| 120 | match res { | ||
| 121 | Ok((domain, result)) => match result { | ||
| 122 | Ok(server) => { | ||
| 123 | servers.insert(domain, server); | ||
| 124 | } | ||
| 125 | Err(e) => { | ||
| 126 | tracing::error!(domain = %domain, error = %e, "failed to verify server"); | ||
| 127 | } | ||
| 128 | }, | ||
| 129 | Err(e) => { | ||
| 130 | tracing::error!(error = %e, "task panicked during server verification"); | ||
| 131 | } | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | servers | ||
| 136 | } | ||
diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..b709d44 --- /dev/null +++ b/src/main.rs | |||
| @@ -0,0 +1,279 @@ | |||
| 1 | mod config; | ||
| 2 | mod db; | ||
| 3 | mod discovery; | ||
| 4 | mod git_mirror; | ||
| 5 | mod health; | ||
| 6 | mod nostr_mirror; | ||
| 7 | mod signing; | ||
| 8 | |||
| 9 | use anyhow::Result; | ||
| 10 | use clap::Parser; | ||
| 11 | use std::path::PathBuf; | ||
| 12 | use std::sync::Arc; | ||
| 13 | |||
| 14 | #[derive(Parser, Debug)] | ||
| 15 | #[command(name = "grasp-mirror", about = "GRASP mirror daemon")] | ||
| 16 | struct Cli { | ||
| 17 | #[arg(short, long, default_value = "config.toml")] | ||
| 18 | config: PathBuf, | ||
| 19 | |||
| 20 | #[command(subcommand)] | ||
| 21 | command: Option<Command>, | ||
| 22 | } | ||
| 23 | |||
| 24 | #[derive(clap::Subcommand, Debug)] | ||
| 25 | enum Command { | ||
| 26 | Daemon, | ||
| 27 | Status, | ||
| 28 | Verify, | ||
| 29 | MirrorOnce, | ||
| 30 | } | ||
| 31 | |||
| 32 | #[tokio::main] | ||
| 33 | async fn main() -> Result<()> { | ||
| 34 | tracing_subscriber::fmt() | ||
| 35 | .with_env_filter( | ||
| 36 | tracing_subscriber::EnvFilter::try_from_default_env() | ||
| 37 | .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), | ||
| 38 | ) | ||
| 39 | .init(); | ||
| 40 | |||
| 41 | let cli = Cli::parse(); | ||
| 42 | let config = config::ResolvedConfig::load(&cli.config)?; | ||
| 43 | |||
| 44 | tracing::info!( | ||
| 45 | npubs = config.npubs.len(), | ||
| 46 | servers = config.servers.known.len(), | ||
| 47 | index_relays = config.discovery.index_relays.len(), | ||
| 48 | "starting grasp-mirror" | ||
| 49 | ); | ||
| 50 | |||
| 51 | let db = db::MirrorDb::open(&config.storage.database).await?; | ||
| 52 | |||
| 53 | match cli.command.unwrap_or(Command::Daemon) { | ||
| 54 | Command::Daemon => run_daemon(config, db).await, | ||
| 55 | Command::Status => run_status(db).await, | ||
| 56 | Command::Verify => run_verify(config).await, | ||
| 57 | Command::MirrorOnce => run_mirror_once(config, db).await, | ||
| 58 | } | ||
| 59 | } | ||
| 60 | |||
| 61 | fn build_nostr_client(relay_urls: &[String]) -> nostr_sdk::Client { | ||
| 62 | let client = nostr_sdk::Client::default(); | ||
| 63 | for url in relay_urls { | ||
| 64 | let _ = client.add_relay(url); | ||
| 65 | } | ||
| 66 | client | ||
| 67 | } | ||
| 68 | |||
| 69 | async fn run_daemon(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> { | ||
| 70 | let db = Arc::new(db); | ||
| 71 | let config = Arc::new(config); | ||
| 72 | |||
| 73 | let servers = health::verify_all_servers(&config.servers.known).await; | ||
| 74 | let healthy: Vec<_> = servers | ||
| 75 | .values() | ||
| 76 | .filter(|s| s.is_grasp_server()) | ||
| 77 | .cloned() | ||
| 78 | .collect(); | ||
| 79 | |||
| 80 | tracing::info!( | ||
| 81 | total = servers.len(), | ||
| 82 | healthy = healthy.len(), | ||
| 83 | "server verification complete" | ||
| 84 | ); | ||
| 85 | |||
| 86 | let healthy = Arc::new(healthy); | ||
| 87 | |||
| 88 | let relay_urls = config.relay_urls(); | ||
| 89 | let nostr_client = build_nostr_client(&relay_urls); | ||
| 90 | nostr_client.connect().await; | ||
| 91 | |||
| 92 | let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir); | ||
| 93 | let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone()); | ||
| 94 | |||
| 95 | let mut interval = tokio::time::interval(std::time::Duration::from_secs( | ||
| 96 | config.discovery.poll_interval_secs, | ||
| 97 | )); | ||
| 98 | |||
| 99 | tracing::info!( | ||
| 100 | "daemon started, polling every {}s", | ||
| 101 | config.discovery.poll_interval_secs | ||
| 102 | ); | ||
| 103 | |||
| 104 | loop { | ||
| 105 | tokio::select! { | ||
| 106 | _ = interval.tick() => { | ||
| 107 | if let Err(e) = mirror_cycle( | ||
| 108 | &config, | ||
| 109 | &db, | ||
| 110 | &nostr_client, | ||
| 111 | &mirror, | ||
| 112 | &nostr_mirror, | ||
| 113 | &healthy, | ||
| 114 | ).await { | ||
| 115 | tracing::error!(error = %e, "mirror cycle failed"); | ||
| 116 | } | ||
| 117 | } | ||
| 118 | _ = tokio::signal::ctrl_c() => { | ||
| 119 | tracing::info!("shutting down"); | ||
| 120 | break; | ||
| 121 | } | ||
| 122 | } | ||
| 123 | } | ||
| 124 | |||
| 125 | Ok(()) | ||
| 126 | } | ||
| 127 | |||
| 128 | async fn mirror_cycle( | ||
| 129 | config: &Arc<config::ResolvedConfig>, | ||
| 130 | db: &Arc<db::MirrorDb>, | ||
| 131 | nostr_client: &nostr_sdk::Client, | ||
| 132 | mirror: &git_mirror::GitMirror, | ||
| 133 | nostr_mirror: &nostr_mirror::NostrMirror, | ||
| 134 | servers: &Arc<Vec<health::GraspServer>>, | ||
| 135 | ) -> Result<()> { | ||
| 136 | tracing::info!("starting mirror cycle"); | ||
| 137 | |||
| 138 | let relay_urls = config.relay_urls(); | ||
| 139 | let repos = discovery::discover_repos_from_relays(nostr_client, &config.npubs, &relay_urls) | ||
| 140 | .await?; | ||
| 141 | |||
| 142 | tracing::info!(count = repos.len(), "discovered repos"); | ||
| 143 | |||
| 144 | discovery::persist_discovered_repos(db, &repos).await?; | ||
| 145 | |||
| 146 | let server_map: std::collections::HashMap<String, health::GraspServer> = servers | ||
| 147 | .iter() | ||
| 148 | .map(|s| (s.domain.clone(), s.clone())) | ||
| 149 | .collect(); | ||
| 150 | |||
| 151 | for repo in &repos { | ||
| 152 | let missing = discovery::identify_missing_servers(repo, &server_map); | ||
| 153 | |||
| 154 | if missing.is_empty() { | ||
| 155 | tracing::debug!(identifier = %repo.identifier, "repo already on all servers"); | ||
| 156 | continue; | ||
| 157 | } | ||
| 158 | |||
| 159 | tracing::info!( | ||
| 160 | identifier = %repo.identifier, | ||
| 161 | missing = missing.iter().map(|s| s.domain.as_str()).collect::<Vec<_>>().join(", "), | ||
| 162 | "mirroring to missing servers" | ||
| 163 | ); | ||
| 164 | |||
| 165 | mirror.mirror_repo_to_servers(db, repo, &missing).await?; | ||
| 166 | nostr_mirror.forward_repo_events(db, repo, servers).await?; | ||
| 167 | } | ||
| 168 | |||
| 169 | nostr_mirror | ||
| 170 | .sync_all_events(db, &config.npubs, servers) | ||
| 171 | .await?; | ||
| 172 | |||
| 173 | tracing::info!("mirror cycle complete"); | ||
| 174 | Ok(()) | ||
| 175 | } | ||
| 176 | |||
| 177 | async fn run_status(db: db::MirrorDb) -> Result<()> { | ||
| 178 | let repos = db.get_all_repos().await?; | ||
| 179 | let syncs = db.get_sync_summary().await?; | ||
| 180 | |||
| 181 | println!("=== GRASP Mirror Status ===\n"); | ||
| 182 | println!("Repos tracked: {}\n", repos.len()); | ||
| 183 | |||
| 184 | for repo in &repos { | ||
| 185 | println!(" {} ({})", repo.identifier, &repo.pubkey[..12]); | ||
| 186 | } | ||
| 187 | |||
| 188 | println!("\nSync records: {}\n", syncs.len()); | ||
| 189 | for sync in &syncs { | ||
| 190 | let status = if sync.git_synced && sync.nostr_synced { | ||
| 191 | "OK" | ||
| 192 | } else if sync.error.is_some() { | ||
| 193 | "ERR" | ||
| 194 | } else { | ||
| 195 | "PENDING" | ||
| 196 | }; | ||
| 197 | println!( | ||
| 198 | " repo:{} server:{} git:{} nostr:{} [{}]", | ||
| 199 | sync.repo_id, | ||
| 200 | sync.server_domain, | ||
| 201 | if sync.git_synced { "Y" } else { "N" }, | ||
| 202 | if sync.nostr_synced { "Y" } else { "N" }, | ||
| 203 | status, | ||
| 204 | ); | ||
| 205 | if let Some(err) = &sync.error { | ||
| 206 | println!(" error: {}", err); | ||
| 207 | } | ||
| 208 | } | ||
| 209 | |||
| 210 | Ok(()) | ||
| 211 | } | ||
| 212 | |||
| 213 | async fn run_verify(config: config::ResolvedConfig) -> Result<()> { | ||
| 214 | let servers = health::verify_all_servers(&config.servers.known).await; | ||
| 215 | |||
| 216 | println!("=== GRASP Server Verification ===\n"); | ||
| 217 | |||
| 218 | for (domain, server) in &servers { | ||
| 219 | let grasp = server.is_grasp_server(); | ||
| 220 | let version = server | ||
| 221 | .nip11 | ||
| 222 | .as_ref() | ||
| 223 | .and_then(|i| i.version.clone()) | ||
| 224 | .unwrap_or_else(|| "unknown".to_string()); | ||
| 225 | let grasps = server | ||
| 226 | .nip11 | ||
| 227 | .as_ref() | ||
| 228 | .and_then(|i| i.supported_grasps.clone()) | ||
| 229 | .map(|g| g.join(", ")) | ||
| 230 | .unwrap_or_else(|| "NONE".to_string()); | ||
| 231 | |||
| 232 | println!( | ||
| 233 | " {} [{}] v={} GRASP={}", | ||
| 234 | domain, | ||
| 235 | if grasp { "OK" } else { "SKIP" }, | ||
| 236 | version, | ||
| 237 | grasps, | ||
| 238 | ); | ||
| 239 | } | ||
| 240 | |||
| 241 | let healthy = servers.values().filter(|s| s.is_grasp_server()).count(); | ||
| 242 | println!( | ||
| 243 | "\n{}/{} verified as GRASP servers", | ||
| 244 | healthy, | ||
| 245 | servers.len() | ||
| 246 | ); | ||
| 247 | |||
| 248 | Ok(()) | ||
| 249 | } | ||
| 250 | |||
| 251 | async fn run_mirror_once(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> { | ||
| 252 | let db = Arc::new(db); | ||
| 253 | let config = Arc::new(config); | ||
| 254 | |||
| 255 | let servers = health::verify_all_servers(&config.servers.known).await; | ||
| 256 | let healthy: Vec<_> = servers | ||
| 257 | .values() | ||
| 258 | .filter(|s| s.is_grasp_server()) | ||
| 259 | .cloned() | ||
| 260 | .collect(); | ||
| 261 | |||
| 262 | let relay_urls = config.relay_urls(); | ||
| 263 | let nostr_client = build_nostr_client(&relay_urls); | ||
| 264 | nostr_client.connect().await; | ||
| 265 | |||
| 266 | let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir); | ||
| 267 | let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone()); | ||
| 268 | let healthy = Arc::new(healthy); | ||
| 269 | |||
| 270 | mirror_cycle( | ||
| 271 | &config, | ||
| 272 | &db, | ||
| 273 | &nostr_client, | ||
| 274 | &mirror, | ||
| 275 | &nostr_mirror, | ||
| 276 | &healthy, | ||
| 277 | ) | ||
| 278 | .await | ||
| 279 | } | ||
diff --git a/src/nostr_mirror.rs b/src/nostr_mirror.rs new file mode 100644 index 0000000..76f66d0 --- /dev/null +++ b/src/nostr_mirror.rs | |||
| @@ -0,0 +1,139 @@ | |||
| 1 | use crate::db::MirrorDb; | ||
| 2 | use crate::discovery::DiscoveredRepo; | ||
| 3 | use crate::health::GraspServer; | ||
| 4 | use anyhow::Result; | ||
| 5 | use nostr::Kind; | ||
| 6 | use nostr_sdk::prelude::*; | ||
| 7 | |||
| 8 | pub struct NostrMirror { | ||
| 9 | client: nostr_sdk::Client, | ||
| 10 | } | ||
| 11 | |||
| 12 | impl NostrMirror { | ||
| 13 | pub fn new(client: nostr_sdk::Client) -> Self { | ||
| 14 | Self { client } | ||
| 15 | } | ||
| 16 | |||
| 17 | pub async fn forward_events_to_servers( | ||
| 18 | &self, | ||
| 19 | db: &MirrorDb, | ||
| 20 | events: &[Event], | ||
| 21 | servers: &[GraspServer], | ||
| 22 | ) -> Result<()> { | ||
| 23 | for event in events { | ||
| 24 | if db.have_seen_event(&event.id.to_hex()).await? { | ||
| 25 | continue; | ||
| 26 | } | ||
| 27 | |||
| 28 | for server in servers { | ||
| 29 | if !server.is_grasp_server() { | ||
| 30 | continue; | ||
| 31 | } | ||
| 32 | |||
| 33 | tracing::debug!( | ||
| 34 | event_id = %event.id.to_hex(), | ||
| 35 | kind = event.kind.as_u16(), | ||
| 36 | server = %server.domain, | ||
| 37 | "forwarding event" | ||
| 38 | ); | ||
| 39 | |||
| 40 | let url: RelayUrl = RelayUrl::parse(&server.relay_url)?; | ||
| 41 | let urls = vec![url]; | ||
| 42 | |||
| 43 | match self.client.send_event_to(urls, event.clone()).await { | ||
| 44 | Ok(_) => { | ||
| 45 | tracing::debug!( | ||
| 46 | event_id = %event.id.to_hex(), | ||
| 47 | server = %server.domain, | ||
| 48 | "event forwarded" | ||
| 49 | ); | ||
| 50 | } | ||
| 51 | Err(e) => { | ||
| 52 | tracing::warn!( | ||
| 53 | event_id = %event.id.to_hex(), | ||
| 54 | server = %server.domain, | ||
| 55 | error = %e, | ||
| 56 | "failed to forward event" | ||
| 57 | ); | ||
| 58 | } | ||
| 59 | } | ||
| 60 | } | ||
| 61 | |||
| 62 | let _ = db.record_event(&event.id.to_hex()).await; | ||
| 63 | } | ||
| 64 | |||
| 65 | Ok(()) | ||
| 66 | } | ||
| 67 | |||
| 68 | pub async fn forward_repo_events( | ||
| 69 | &self, | ||
| 70 | db: &MirrorDb, | ||
| 71 | repo: &DiscoveredRepo, | ||
| 72 | servers: &[GraspServer], | ||
| 73 | ) -> Result<()> { | ||
| 74 | let filters = vec![ | ||
| 75 | Filter::new() | ||
| 76 | .kind(Kind::Custom(30617)) | ||
| 77 | .author(repo.pubkey) | ||
| 78 | .identifier(&repo.identifier), | ||
| 79 | Filter::new() | ||
| 80 | .kind(Kind::Custom(30618)) | ||
| 81 | .author(repo.pubkey) | ||
| 82 | .identifier(&repo.identifier), | ||
| 83 | ]; | ||
| 84 | |||
| 85 | let mut all_events = Vec::new(); | ||
| 86 | for filter in filters { | ||
| 87 | let events = self | ||
| 88 | .client | ||
| 89 | .fetch_events(filter, std::time::Duration::from_secs(15)) | ||
| 90 | .await?; | ||
| 91 | all_events.extend(events); | ||
| 92 | } | ||
| 93 | |||
| 94 | if all_events.is_empty() { | ||
| 95 | tracing::debug!(identifier = %repo.identifier, "no events to forward"); | ||
| 96 | return Ok(()); | ||
| 97 | } | ||
| 98 | |||
| 99 | tracing::info!( | ||
| 100 | identifier = %repo.identifier, | ||
| 101 | count = all_events.len(), | ||
| 102 | "forwarding repo events" | ||
| 103 | ); | ||
| 104 | |||
| 105 | self.forward_events_to_servers(db, &all_events, servers).await | ||
| 106 | } | ||
| 107 | |||
| 108 | pub async fn sync_all_events( | ||
| 109 | &self, | ||
| 110 | db: &MirrorDb, | ||
| 111 | npubs: &[PublicKey], | ||
| 112 | servers: &[GraspServer], | ||
| 113 | ) -> Result<()> { | ||
| 114 | let git_kinds = [ | ||
| 115 | Kind::Custom(30617), | ||
| 116 | Kind::Custom(30618), | ||
| 117 | Kind::Custom(1631), | ||
| 118 | Kind::Custom(1642), | ||
| 119 | Kind::EventDeletion, | ||
| 120 | ]; | ||
| 121 | |||
| 122 | let mut all_events = Vec::new(); | ||
| 123 | |||
| 124 | for pk in npubs { | ||
| 125 | for kind in &git_kinds { | ||
| 126 | let filter = Filter::new().kind(*kind).author(*pk).limit(100); | ||
| 127 | let events = self | ||
| 128 | .client | ||
| 129 | .fetch_events(filter, std::time::Duration::from_secs(30)) | ||
| 130 | .await?; | ||
| 131 | all_events.extend(events); | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | tracing::info!(count = all_events.len(), "fetched events for forwarding"); | ||
| 136 | |||
| 137 | self.forward_events_to_servers(db, &all_events, servers).await | ||
| 138 | } | ||
| 139 | } | ||
diff --git a/src/signing.rs b/src/signing.rs new file mode 100644 index 0000000..7ec796a --- /dev/null +++ b/src/signing.rs | |||
| @@ -0,0 +1,76 @@ | |||
| 1 | use crate::discovery::DiscoveredRepo; | ||
| 2 | use crate::health::GraspServer; | ||
| 3 | use anyhow::{Context, Result}; | ||
| 4 | use nostr_sdk::prelude::*; | ||
| 5 | |||
| 6 | pub struct OptionalSigner { | ||
| 7 | keys: Option<Keys>, | ||
| 8 | } | ||
| 9 | |||
| 10 | impl OptionalSigner { | ||
| 11 | pub fn none() -> Self { | ||
| 12 | Self { keys: None } | ||
| 13 | } | ||
| 14 | |||
| 15 | pub fn from_nsec(nsec: &str) -> Result<Self> { | ||
| 16 | let keys = Keys::parse(nsec).context("failed to parse nsec")?; | ||
| 17 | Ok(Self { keys: Some(keys) }) | ||
| 18 | } | ||
| 19 | |||
| 20 | pub fn is_available(&self) -> bool { | ||
| 21 | self.keys.is_some() | ||
| 22 | } | ||
| 23 | |||
| 24 | pub async fn build_updated_announcement( | ||
| 25 | &self, | ||
| 26 | original: &DiscoveredRepo, | ||
| 27 | additional_servers: &[GraspServer], | ||
| 28 | ) -> Result<Option<Event>> { | ||
| 29 | let keys = match &self.keys { | ||
| 30 | Some(k) => k, | ||
| 31 | None => { | ||
| 32 | tracing::debug!("no signer available, skipping announcement update"); | ||
| 33 | return Ok(None); | ||
| 34 | } | ||
| 35 | }; | ||
| 36 | |||
| 37 | if additional_servers.is_empty() { | ||
| 38 | return Ok(None); | ||
| 39 | } | ||
| 40 | |||
| 41 | let pk_hex = original.pubkey.to_hex(); | ||
| 42 | |||
| 43 | let mut new_clone_urls: Vec<String> = original.clone_urls.clone(); | ||
| 44 | for server in additional_servers { | ||
| 45 | let url = server.clone_url(&pk_hex, &original.identifier); | ||
| 46 | if !new_clone_urls.contains(&url) { | ||
| 47 | new_clone_urls.push(url); | ||
| 48 | } | ||
| 49 | } | ||
| 50 | |||
| 51 | let mut tags: Vec<Tag> = vec![ | ||
| 52 | Tag::custom(TagKind::Custom("d".into()), [&original.identifier]), | ||
| 53 | ]; | ||
| 54 | |||
| 55 | for url in &new_clone_urls { | ||
| 56 | tags.push(Tag::custom(TagKind::Custom("clone".into()), [url.as_str()])); | ||
| 57 | } | ||
| 58 | for url in &original.relay_urls { | ||
| 59 | tags.push(Tag::custom( | ||
| 60 | TagKind::Custom("relays".into()), | ||
| 61 | [url.as_str()], | ||
| 62 | )); | ||
| 63 | } | ||
| 64 | |||
| 65 | let builder = EventBuilder::new(Kind::Custom(30617), "").tags(tags); | ||
| 66 | let event = builder.sign_with_keys(keys)?; | ||
| 67 | |||
| 68 | tracing::info!( | ||
| 69 | identifier = %original.identifier, | ||
| 70 | added = additional_servers.len(), | ||
| 71 | "built updated announcement with additional clone URLs" | ||
| 72 | ); | ||
| 73 | |||
| 74 | Ok(Some(event)) | ||
| 75 | } | ||
| 76 | } | ||