upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorYour Name <you@example.com>2026-05-26 16:11:05 +0530
committerYour Name <you@example.com>2026-05-26 16:11:05 +0530
commit8816a192c95cf539b65975469a2d61aed46f0414 (patch)
tree7590318244a56fabbfa6919ef6d0fab5be529134 /src
feat: initial implementation of grasp-mirror daemon
GRASP mirror daemon that discovers repos from watched npubs and mirrors git data + Nostr events across all known GRASP servers for redundancy. Features: - Configurable npub watch list via .env (MIRROR_NPUBS) - TOML config for GRASP server list, index relays, storage paths - NIP-11 verification of GRASP servers on startup - Discovery of repos via kind:30617 announcements on index relays - Git mirroring (bare clone + push --mirror) to missing GRASP servers - Nostr event forwarding to all GRASP server embedded relays - SQLite state tracking for sync status and event dedup - Optional signing key for updating announcements with new clone URLs - CLI subcommands: daemon, status, verify, mirror-once Architecture: config.rs - TOML + .env config loading db.rs - SQLite state tracking health.rs - NIP-11 GRASP server verification discovery.rs - Relay subscription, kind:30617 parsing git_mirror.rs - Bare clone + push to GRASP servers nostr_mirror.rs - Event forwarding to all GRASP relays signing.rs - Optional announcement updates main.rs - CLI entry point, daemon loop
Diffstat (limited to 'src')
-rw-r--r--src/config.rs129
-rw-r--r--src/db.rs268
-rw-r--r--src/discovery.rs142
-rw-r--r--src/git_mirror.rs159
-rw-r--r--src/health.rs136
-rw-r--r--src/main.rs279
-rw-r--r--src/nostr_mirror.rs139
-rw-r--r--src/signing.rs76
8 files changed, 1328 insertions, 0 deletions
diff --git a/src/config.rs b/src/config.rs
new file mode 100644
index 0000000..ceff44d
--- /dev/null
+++ b/src/config.rs
@@ -0,0 +1,129 @@
1use anyhow::{Context, Result};
2use nostr::{FromBech32, PublicKey, ToBech32};
3use serde::Deserialize;
4use std::path::PathBuf;
5
6#[derive(Debug, Deserialize)]
7pub struct AppConfig {
8 pub discovery: DiscoveryConfig,
9 pub servers: ServersConfig,
10 pub storage: StorageConfig,
11 pub signing: Option<SigningConfig>,
12}
13
14#[derive(Debug, Deserialize)]
15pub struct DiscoveryConfig {
16 pub index_relays: Vec<String>,
17 #[serde(default = "default_poll_interval")]
18 pub poll_interval_secs: u64,
19}
20
21fn default_poll_interval() -> u64 {
22 300
23}
24
25#[derive(Debug, Deserialize)]
26pub struct ServersConfig {
27 pub known: Vec<String>,
28}
29
30#[derive(Debug, Deserialize)]
31pub struct StorageConfig {
32 #[serde(default = "default_mirror_dir")]
33 pub mirror_dir: PathBuf,
34 #[serde(default = "default_database")]
35 pub database: PathBuf,
36}
37
38fn default_mirror_dir() -> PathBuf {
39 dirs::data_dir()
40 .unwrap_or_else(|| PathBuf::from("."))
41 .join("grasp-mirror")
42 .join("repos")
43}
44
45fn default_database() -> PathBuf {
46 dirs::data_dir()
47 .unwrap_or_else(|| PathBuf::from("."))
48 .join("grasp-mirror")
49 .join("mirror.db")
50}
51
52#[derive(Debug, Deserialize)]
53pub struct SigningConfig {
54 pub key_file: PathBuf,
55}
56
57pub struct ResolvedConfig {
58 pub discovery: DiscoveryConfig,
59 pub servers: ServersConfig,
60 pub storage: StorageConfig,
61 pub signing: Option<SigningConfig>,
62 pub npubs: Vec<PublicKey>,
63}
64
65impl ResolvedConfig {
66 pub fn load(config_path: &PathBuf) -> Result<Self> {
67 let _ = dotenvy::dotenv();
68
69 let config_str = std::fs::read_to_string(config_path)
70 .with_context(|| format!("failed to read config from {:?}", config_path))?;
71 let app: AppConfig = toml::from_str(&config_str).context("failed to parse config.toml")?;
72
73 let npubs = Self::load_npubs()?;
74
75 std::fs::create_dir_all(&app.storage.mirror_dir)
76 .context("failed to create mirror directory")?;
77 if let Some(parent) = app.storage.database.parent() {
78 std::fs::create_dir_all(parent).context("failed to create database directory")?;
79 }
80
81 Ok(Self {
82 discovery: app.discovery,
83 servers: app.servers,
84 storage: app.storage,
85 signing: app.signing,
86 npubs,
87 })
88 }
89
90 fn load_npubs() -> Result<Vec<PublicKey>> {
91 let raw = std::env::var("MIRROR_NPUBS").unwrap_or_default();
92 let mut npubs = Vec::new();
93
94 for entry in raw.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()) {
95 let pk = if entry.starts_with("npub1") {
96 PublicKey::from_bech32(entry)
97 .with_context(|| format!("invalid npub: {}", entry))?
98 } else {
99 let bytes = hex::decode(entry)
100 .with_context(|| format!("invalid hex pubkey: {}", entry))?;
101 PublicKey::from_slice(&bytes)
102 .with_context(|| format!("invalid pubkey bytes: {}", entry))?
103 };
104 npubs.push(pk);
105 }
106
107 if npubs.is_empty() {
108 tracing::warn!("MIRROR_NPUBS is empty — no pubkeys to mirror");
109 }
110
111 Ok(npubs)
112 }
113
114 pub fn relay_urls(&self) -> Vec<String> {
115 let mut relays = self.discovery.index_relays.clone();
116 for server in &self.servers.known {
117 let relay = format!(
118 "wss://{}",
119 server
120 .trim_start_matches("https://")
121 .trim_start_matches("http://")
122 );
123 if !relays.contains(&relay) {
124 relays.push(relay);
125 }
126 }
127 relays
128 }
129}
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..bb1bf31
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,268 @@
1use anyhow::{Context, Result};
2use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
3use sqlx::SqlitePool;
4use std::path::Path;
5
6pub struct MirrorDb {
7 pool: SqlitePool,
8}
9
10#[derive(Debug, sqlx::FromRow)]
11pub struct RepoRecord {
12 pub id: i64,
13 pub pubkey: String,
14 pub identifier: String,
15 pub announcement_event_id: Option<String>,
16 pub last_seen_at: i64,
17}
18
19#[derive(Debug, sqlx::FromRow)]
20pub struct ServerSyncRecord {
21 pub id: i64,
22 pub repo_id: i64,
23 pub server_domain: String,
24 pub git_synced: bool,
25 pub nostr_synced: bool,
26 pub last_sync_at: Option<i64>,
27 pub error: Option<String>,
28}
29
30#[derive(Debug, sqlx::FromRow)]
31pub struct EventRecord {
32 pub id: i64,
33 pub event_id: String,
34 pub first_seen_at: i64,
35}
36
37impl MirrorDb {
38 pub async fn open(db_path: &Path) -> Result<Self> {
39 let opts = SqliteConnectOptions::new()
40 .filename(db_path)
41 .create_if_missing(true)
42 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
43
44 let pool = SqlitePoolOptions::new()
45 .max_connections(5)
46 .connect_with(opts)
47 .await
48 .with_context(|| format!("failed to open database at {:?}", db_path))?;
49
50 let db = Self { pool };
51 db.run_migrations().await?;
52 Ok(db)
53 }
54
55 async fn run_migrations(&self) -> Result<()> {
56 sqlx::query(
57 r#"
58 CREATE TABLE IF NOT EXISTS repos (
59 id INTEGER PRIMARY KEY AUTOINCREMENT,
60 pubkey TEXT NOT NULL,
61 identifier TEXT NOT NULL,
62 announcement_event_id TEXT,
63 last_seen_at INTEGER NOT NULL,
64 UNIQUE(pubkey, identifier)
65 );
66
67 CREATE TABLE IF NOT EXISTS server_syncs (
68 id INTEGER PRIMARY KEY AUTOINCREMENT,
69 repo_id INTEGER NOT NULL REFERENCES repos(id),
70 server_domain TEXT NOT NULL,
71 git_synced INTEGER NOT NULL DEFAULT 0,
72 nostr_synced INTEGER NOT NULL DEFAULT 0,
73 last_sync_at INTEGER,
74 error TEXT,
75 UNIQUE(repo_id, server_domain)
76 );
77
78 CREATE TABLE IF NOT EXISTS seen_events (
79 id INTEGER PRIMARY KEY AUTOINCREMENT,
80 event_id TEXT NOT NULL UNIQUE,
81 first_seen_at INTEGER NOT NULL
82 );
83
84 CREATE INDEX IF NOT EXISTS idx_repos_pubkey ON repos(pubkey);
85 CREATE INDEX IF NOT EXISTS idx_server_syncs_repo ON server_syncs(repo_id);
86 CREATE INDEX IF NOT EXISTS idx_seen_events_id ON seen_events(event_id);
87 "#,
88 )
89 .execute(&self.pool)
90 .await
91 .context("failed to run migrations")?;
92
93 Ok(())
94 }
95
96 pub async fn upsert_repo(
97 &self,
98 pubkey: &str,
99 identifier: &str,
100 event_id: &str,
101 ) -> Result<i64> {
102 let now = chrono_now_secs();
103 let result = sqlx::query_as::<_, RepoRecord>(
104 "SELECT * FROM repos WHERE pubkey = ? AND identifier = ?",
105 )
106 .bind(pubkey)
107 .bind(identifier)
108 .fetch_optional(&self.pool)
109 .await?;
110
111 if let Some(existing) = result {
112 sqlx::query("UPDATE repos SET announcement_event_id = ?, last_seen_at = ? WHERE id = ?")
113 .bind(event_id)
114 .bind(now)
115 .bind(existing.id)
116 .execute(&self.pool)
117 .await?;
118 Ok(existing.id)
119 } else {
120 sqlx::query("INSERT INTO repos (pubkey, identifier, announcement_event_id, last_seen_at) VALUES (?, ?, ?, ?)")
121 .bind(pubkey)
122 .bind(identifier)
123 .bind(event_id)
124 .bind(now)
125 .execute(&self.pool)
126 .await?;
127
128 let row: (i64,) = sqlx::query_as("SELECT last_insert_rowid()")
129 .fetch_one(&self.pool)
130 .await?;
131 Ok(row.0)
132 }
133 }
134
135 pub async fn get_repos_needing_git_sync(&self, known_servers: &[String]) -> Result<Vec<(RepoRecord, Vec<String>)>> {
136 let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos")
137 .fetch_all(&self.pool)
138 .await?;
139
140 let mut result = Vec::new();
141 for repo in repos {
142 let synced: Vec<String> = sqlx::query_scalar::<_, String>(
143 "SELECT server_domain FROM server_syncs WHERE repo_id = ? AND git_synced = 1",
144 )
145 .bind(repo.id)
146 .fetch_all(&self.pool)
147 .await?;
148
149 let missing: Vec<String> = known_servers
150 .iter()
151 .filter(|s| !synced.contains(s))
152 .cloned()
153 .collect();
154
155 if !missing.is_empty() {
156 result.push((repo, missing));
157 }
158 }
159
160 Ok(result)
161 }
162
163 pub async fn mark_git_synced(
164 &self,
165 repo_id: i64,
166 server_domain: &str,
167 ) -> Result<()> {
168 let now = chrono_now_secs();
169 sqlx::query(
170 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at)
171 VALUES (?, ?, 1, 0, ?)
172 ON CONFLICT(repo_id, server_domain) DO UPDATE SET git_synced = 1, last_sync_at = ?, error = NULL"#,
173 )
174 .bind(repo_id)
175 .bind(server_domain)
176 .bind(now)
177 .bind(now)
178 .execute(&self.pool)
179 .await?;
180 Ok(())
181 }
182
183 pub async fn mark_nostr_synced(
184 &self,
185 repo_id: i64,
186 server_domain: &str,
187 ) -> Result<()> {
188 let now = chrono_now_secs();
189 sqlx::query(
190 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at)
191 VALUES (?, ?, 0, 1, ?)
192 ON CONFLICT(repo_id, server_domain) DO UPDATE SET nostr_synced = 1, last_sync_at = ?, error = NULL"#,
193 )
194 .bind(repo_id)
195 .bind(server_domain)
196 .bind(now)
197 .bind(now)
198 .execute(&self.pool)
199 .await?;
200 Ok(())
201 }
202
203 pub async fn mark_sync_error(
204 &self,
205 repo_id: i64,
206 server_domain: &str,
207 error: &str,
208 ) -> Result<()> {
209 let now = chrono_now_secs();
210 sqlx::query(
211 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at, error)
212 VALUES (?, ?, 0, 0, ?, ?)
213 ON CONFLICT(repo_id, server_domain) DO UPDATE SET error = ?, last_sync_at = ?"#,
214 )
215 .bind(repo_id)
216 .bind(server_domain)
217 .bind(now)
218 .bind(error)
219 .bind(error)
220 .bind(now)
221 .execute(&self.pool)
222 .await?;
223 Ok(())
224 }
225
226 pub async fn have_seen_event(&self, event_id: &str) -> Result<bool> {
227 let result: Option<(i64,)> = sqlx::query_as(
228 "SELECT id FROM seen_events WHERE event_id = ?",
229 )
230 .bind(event_id)
231 .fetch_optional(&self.pool)
232 .await?;
233 Ok(result.is_some())
234 }
235
236 pub async fn record_event(&self, event_id: &str) -> Result<()> {
237 let now = chrono_now_secs();
238 sqlx::query("INSERT OR IGNORE INTO seen_events (event_id, first_seen_at) VALUES (?, ?)")
239 .bind(event_id)
240 .bind(now)
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 pub async fn get_all_repos(&self) -> Result<Vec<RepoRecord>> {
247 let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos ORDER BY last_seen_at DESC")
248 .fetch_all(&self.pool)
249 .await?;
250 Ok(repos)
251 }
252
253 pub async fn get_sync_summary(&self) -> Result<Vec<ServerSyncRecord>> {
254 let records = sqlx::query_as::<_, ServerSyncRecord>(
255 "SELECT * FROM server_syncs ORDER BY last_sync_at DESC NULLS LAST",
256 )
257 .fetch_all(&self.pool)
258 .await?;
259 Ok(records)
260 }
261}
262
263fn chrono_now_secs() -> i64 {
264 std::time::SystemTime::now()
265 .duration_since(std::time::UNIX_EPOCH)
266 .unwrap_or_default()
267 .as_secs() as i64
268}
diff --git a/src/discovery.rs b/src/discovery.rs
new file mode 100644
index 0000000..59b9f2c
--- /dev/null
+++ b/src/discovery.rs
@@ -0,0 +1,142 @@
1use crate::db::MirrorDb;
2use crate::health::GraspServer;
3use anyhow::{Context, Result};
4use nostr::Kind;
5use nostr_sdk::prelude::*;
6use std::collections::HashMap;
7
8#[derive(Debug, Clone)]
9pub struct DiscoveredRepo {
10 pub pubkey: PublicKey,
11 pub identifier: String,
12 pub event_id: EventId,
13 pub clone_urls: Vec<String>,
14 pub relay_urls: Vec<String>,
15}
16
17pub async fn discover_repos_from_relays(
18 client: &nostr_sdk::Client,
19 npubs: &[PublicKey],
20 relay_urls: &[String],
21) -> Result<Vec<DiscoveredRepo>> {
22 for url in relay_urls {
23 if let Err(e) = client.add_relay(url).await {
24 tracing::warn!(relay = %url, error = %e, "failed to add relay");
25 continue;
26 }
27 }
28 client.connect().await;
29
30 let mut repos = Vec::new();
31
32 for pk in npubs {
33 let filter = Filter::new()
34 .kind(Kind::Custom(30617))
35 .author(*pk)
36 .limit(50);
37
38 let events = client
39 .fetch_events(filter, std::time::Duration::from_secs(30))
40 .await
41 .context("failed to fetch events from relays")?;
42
43 for event in events.into_iter() {
44 if let Some(repo) = parse_announcement(&event) {
45 repos.push(repo);
46 }
47 }
48 }
49
50 tracing::info!(count = repos.len(), "discovered repos from relays");
51 Ok(repos)
52}
53
54fn parse_announcement(event: &Event) -> Option<DiscoveredRepo> {
55 let mut identifier = None;
56 let mut clone_urls = Vec::new();
57 let mut relay_urls = Vec::new();
58
59 for tag in event.tags.iter() {
60 let tag_vec = tag.clone().to_vec();
61 if tag_vec.len() < 2 {
62 continue;
63 }
64 match tag_vec[0].as_str() {
65 "d" => identifier = Some(tag_vec[1].clone()),
66 "clone" => clone_urls.push(tag_vec[1].clone()),
67 "relays" => relay_urls.push(tag_vec[1].clone()),
68 _ => {}
69 }
70 }
71
72 let identifier = identifier?;
73 if clone_urls.is_empty() {
74 tracing::warn!(
75 identifier = %identifier,
76 "repo announcement has no clone URLs"
77 );
78 }
79
80 Some(DiscoveredRepo {
81 pubkey: event.pubkey,
82 identifier,
83 event_id: event.id,
84 clone_urls,
85 relay_urls,
86 })
87}
88
89pub async fn persist_discovered_repos(
90 db: &MirrorDb,
91 repos: &[DiscoveredRepo],
92) -> Result<HashMap<String, i64>> {
93 let mut repo_ids = HashMap::new();
94 for repo in repos {
95 let pk_hex = repo.pubkey.to_hex();
96 match db
97 .upsert_repo(&pk_hex, &repo.identifier, &repo.event_id.to_hex())
98 .await
99 {
100 Ok(id) => {
101 tracing::debug!(
102 identifier = %repo.identifier,
103 repo_id = id,
104 "persisted repo"
105 );
106 repo_ids.insert(format!("{}:{}", pk_hex, repo.identifier), id);
107 }
108 Err(e) => {
109 tracing::error!(
110 identifier = %repo.identifier,
111 error = %e,
112 "failed to persist repo"
113 );
114 }
115 }
116 }
117 Ok(repo_ids)
118}
119
120pub fn identify_missing_servers(
121 repo: &DiscoveredRepo,
122 servers: &HashMap<String, GraspServer>,
123) -> Vec<GraspServer> {
124 let mut missing = Vec::new();
125
126 for (_domain, server) in servers {
127 if !server.is_grasp_server() {
128 continue;
129 }
130
131 let has_clone = repo
132 .clone_urls
133 .iter()
134 .any(|url| url.contains(&server.domain));
135
136 if !has_clone {
137 missing.push(server.clone());
138 }
139 }
140
141 missing
142}
diff --git a/src/git_mirror.rs b/src/git_mirror.rs
new file mode 100644
index 0000000..47c0442
--- /dev/null
+++ b/src/git_mirror.rs
@@ -0,0 +1,159 @@
1use crate::db::MirrorDb;
2use crate::discovery::DiscoveredRepo;
3use crate::health::GraspServer;
4use anyhow::{Context, Result};
5use git2::RemoteCallbacks;
6use std::path::Path;
7
8pub struct GitMirror {
9 mirror_dir: std::path::PathBuf,
10}
11
12impl GitMirror {
13 pub fn new(mirror_dir: &Path) -> Self {
14 Self {
15 mirror_dir: mirror_dir.to_path_buf(),
16 }
17 }
18
19 fn repo_path(&self, pubkey: &str, identifier: &str) -> std::path::PathBuf {
20 self.mirror_dir
21 .join(pubkey)
22 .join(format!("{}.git", identifier))
23 }
24
25 pub async fn mirror_repo_to_servers(
26 &self,
27 db: &MirrorDb,
28 repo: &DiscoveredRepo,
29 target_servers: &[GraspServer],
30 ) -> Result<()> {
31 if target_servers.is_empty() {
32 tracing::debug!(
33 identifier = %repo.identifier,
34 "no missing servers to mirror to"
35 );
36 return Ok(());
37 }
38
39 let pk_hex = repo.pubkey.to_hex();
40 let repo_path = self.repo_path(&pk_hex, &repo.identifier);
41
42 if !repo_path.exists() {
43 self.clone_bare(&repo_path, &repo.clone_urls)?;
44 }
45
46 for server in target_servers {
47 let target_url = server.clone_url(&pk_hex, &repo.identifier);
48
49 tracing::info!(
50 identifier = %repo.identifier,
51 server = %server.domain,
52 target = %target_url,
53 "mirroring git data"
54 );
55
56 let repo_id = db.get_all_repos().await.ok().and_then(|repos| {
57 repos
58 .iter()
59 .find(|r| r.pubkey == pk_hex && r.identifier == repo.identifier)
60 .map(|r| r.id)
61 });
62
63 match self.push_mirror(&repo_path, &target_url) {
64 Ok(()) => {
65 tracing::info!(
66 identifier = %repo.identifier,
67 server = %server.domain,
68 "git mirror succeeded"
69 );
70 if let Some(id) = repo_id {
71 let _ = db.mark_git_synced(id, &server.domain).await;
72 }
73 }
74 Err(e) => {
75 tracing::error!(
76 identifier = %repo.identifier,
77 server = %server.domain,
78 error = %e,
79 "git mirror failed"
80 );
81 if let Some(id) = repo_id {
82 let _ = db.mark_sync_error(id, &server.domain, &e.to_string()).await;
83 }
84 }
85 }
86 }
87
88 Ok(())
89 }
90
91 fn clone_bare(&self, repo_path: &Path, clone_urls: &[String]) -> Result<()> {
92 if let Some(parent) = repo_path.parent() {
93 std::fs::create_dir_all(parent)
94 .with_context(|| format!("failed to create {:?}", parent))?;
95 }
96
97 let mut last_error = None;
98
99 for url in clone_urls {
100 if url.is_empty() {
101 continue;
102 }
103 tracing::info!(url = %url, path = ?repo_path, "cloning bare repo");
104
105 let mut callbacks = RemoteCallbacks::new();
106 callbacks.credentials(|_url, _username, _allowed| git2::Cred::default());
107
108 let mut fetch_opts = git2::FetchOptions::new();
109 fetch_opts.remote_callbacks(callbacks);
110
111 let mut builder = git2::build::RepoBuilder::new();
112 builder.bare(true).fetch_options(fetch_opts);
113
114 match builder.clone(url, repo_path) {
115 Ok(_) => {
116 tracing::info!(url = %url, "bare clone succeeded");
117 return Ok(());
118 }
119 Err(e) => {
120 tracing::warn!(url = %url, error = %e, "clone failed, trying next URL");
121 last_error = Some(e);
122 if repo_path.exists() {
123 let _ = std::fs::remove_dir_all(repo_path);
124 }
125 }
126 }
127 }
128
129 let err = last_error.unwrap_or_else(|| git2::Error::from_str("no clone URLs available"));
130 Err(err).with_context(|| format!("all clone attempts failed for {:?}", repo_path))
131 }
132
133 fn push_mirror(&self, repo_path: &Path, target_url: &str) -> Result<()> {
134 let repo = git2::Repository::open(repo_path)
135 .with_context(|| format!("failed to open bare repo at {:?}", repo_path))?;
136
137 let mut remote = repo.remote("push_target", target_url)?;
138
139 let mut callbacks = RemoteCallbacks::new();
140 callbacks.credentials(|_url, _username, _allowed| git2::Cred::default());
141 callbacks.push_update_reference(|_refname, status| {
142 if let Some(s) = status {
143 tracing::warn!(status = %s, "push rejected");
144 }
145 Ok(())
146 });
147
148 let mut push_opts = git2::PushOptions::new();
149 push_opts.remote_callbacks(callbacks);
150
151 let refspecs = ["+refs/*:refs/*"];
152
153 remote
154 .push(&refspecs, Some(&mut push_opts))
155 .with_context(|| format!("failed to push mirror to {}", target_url))?;
156
157 Ok(())
158 }
159}
diff --git a/src/health.rs b/src/health.rs
new file mode 100644
index 0000000..2853239
--- /dev/null
+++ b/src/health.rs
@@ -0,0 +1,136 @@
1use anyhow::{Context, Result};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5#[derive(Debug, Clone, Serialize, Deserialize)]
6pub struct Nip11Info {
7 pub name: Option<String>,
8 pub description: Option<String>,
9 pub supported_nips: Option<Vec<u64>>,
10 pub supported_grasps: Option<Vec<String>>,
11 pub software: Option<String>,
12 pub version: Option<String>,
13}
14
15#[derive(Debug, Clone)]
16pub struct GraspServer {
17 pub domain: String,
18 pub relay_url: String,
19 pub clone_url_prefix: String,
20 pub nip11: Option<Nip11Info>,
21 pub healthy: bool,
22}
23
24impl GraspServer {
25 pub fn from_domain(domain: &str) -> Self {
26 let clean = domain
27 .trim_start_matches("https://")
28 .trim_start_matches("http://")
29 .trim_end_matches('/')
30 .to_string();
31 Self {
32 relay_url: format!("wss://{}", clean),
33 clone_url_prefix: format!("https://{}", clean),
34 nip11: None,
35 healthy: false,
36 domain: clean,
37 }
38 }
39
40 pub fn clone_url(&self, npub_hex: &str, identifier: &str) -> String {
41 format!("{}/{}/{}.git", self.clone_url_prefix, npub_hex, identifier)
42 }
43
44 pub fn is_grasp_server(&self) -> bool {
45 self.nip11
46 .as_ref()
47 .map(|info| info.supported_grasps.is_some())
48 .unwrap_or(false)
49 }
50}
51
52pub async fn verify_grasp_server(domain: &str) -> Result<GraspServer> {
53 let mut server = GraspServer::from_domain(domain);
54 let nip11_url = format!("https://{}", server.domain);
55
56 let client = reqwest::Client::builder()
57 .timeout(std::time::Duration::from_secs(10))
58 .build()?;
59
60 let resp = client
61 .get(&nip11_url)
62 .header("Accept", "application/nostr+json")
63 .send()
64 .await;
65
66 match resp {
67 Ok(resp) if resp.status().is_success() => {
68 match resp.json::<Nip11Info>().await {
69 Ok(info) => {
70 let is_grasp = info.supported_grasps.is_some();
71 if is_grasp {
72 tracing::info!(
73 domain = %server.domain,
74 grasps = ?info.supported_grasps,
75 version = ?info.version,
76 "verified GRASP server"
77 );
78 } else {
79 tracing::warn!(
80 domain = %server.domain,
81 "server responded to NIP-11 but has no supported_grasps"
82 );
83 }
84 server.healthy = is_grasp;
85 server.nip11 = Some(info);
86 }
87 Err(e) => {
88 tracing::warn!(domain = %server.domain, error = %e, "failed to parse NIP-11 response");
89 }
90 }
91 }
92 Ok(resp) => {
93 tracing::warn!(
94 domain = %server.domain,
95 status = %resp.status(),
96 "NIP-11 check returned non-success"
97 );
98 }
99 Err(e) => {
100 tracing::warn!(domain = %server.domain, error = %e, "NIP-11 check failed");
101 }
102 }
103
104 Ok(server)
105}
106
107pub async fn verify_all_servers(domains: &[String]) -> HashMap<String, GraspServer> {
108 let mut servers = HashMap::new();
109 let mut set = tokio::task::JoinSet::new();
110
111 for d in domains {
112 let domain = d.clone();
113 set.spawn(async move {
114 let result = verify_grasp_server(&domain).await;
115 (domain, result)
116 });
117 }
118
119 while let Some(res) = set.join_next().await {
120 match res {
121 Ok((domain, result)) => match result {
122 Ok(server) => {
123 servers.insert(domain, server);
124 }
125 Err(e) => {
126 tracing::error!(domain = %domain, error = %e, "failed to verify server");
127 }
128 },
129 Err(e) => {
130 tracing::error!(error = %e, "task panicked during server verification");
131 }
132 }
133 }
134
135 servers
136}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..b709d44
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,279 @@
1mod config;
2mod db;
3mod discovery;
4mod git_mirror;
5mod health;
6mod nostr_mirror;
7mod signing;
8
9use anyhow::Result;
10use clap::Parser;
11use std::path::PathBuf;
12use std::sync::Arc;
13
14#[derive(Parser, Debug)]
15#[command(name = "grasp-mirror", about = "GRASP mirror daemon")]
16struct Cli {
17 #[arg(short, long, default_value = "config.toml")]
18 config: PathBuf,
19
20 #[command(subcommand)]
21 command: Option<Command>,
22}
23
24#[derive(clap::Subcommand, Debug)]
25enum Command {
26 Daemon,
27 Status,
28 Verify,
29 MirrorOnce,
30}
31
32#[tokio::main]
33async fn main() -> Result<()> {
34 tracing_subscriber::fmt()
35 .with_env_filter(
36 tracing_subscriber::EnvFilter::try_from_default_env()
37 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
38 )
39 .init();
40
41 let cli = Cli::parse();
42 let config = config::ResolvedConfig::load(&cli.config)?;
43
44 tracing::info!(
45 npubs = config.npubs.len(),
46 servers = config.servers.known.len(),
47 index_relays = config.discovery.index_relays.len(),
48 "starting grasp-mirror"
49 );
50
51 let db = db::MirrorDb::open(&config.storage.database).await?;
52
53 match cli.command.unwrap_or(Command::Daemon) {
54 Command::Daemon => run_daemon(config, db).await,
55 Command::Status => run_status(db).await,
56 Command::Verify => run_verify(config).await,
57 Command::MirrorOnce => run_mirror_once(config, db).await,
58 }
59}
60
61fn build_nostr_client(relay_urls: &[String]) -> nostr_sdk::Client {
62 let client = nostr_sdk::Client::default();
63 for url in relay_urls {
64 let _ = client.add_relay(url);
65 }
66 client
67}
68
69async fn run_daemon(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> {
70 let db = Arc::new(db);
71 let config = Arc::new(config);
72
73 let servers = health::verify_all_servers(&config.servers.known).await;
74 let healthy: Vec<_> = servers
75 .values()
76 .filter(|s| s.is_grasp_server())
77 .cloned()
78 .collect();
79
80 tracing::info!(
81 total = servers.len(),
82 healthy = healthy.len(),
83 "server verification complete"
84 );
85
86 let healthy = Arc::new(healthy);
87
88 let relay_urls = config.relay_urls();
89 let nostr_client = build_nostr_client(&relay_urls);
90 nostr_client.connect().await;
91
92 let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir);
93 let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone());
94
95 let mut interval = tokio::time::interval(std::time::Duration::from_secs(
96 config.discovery.poll_interval_secs,
97 ));
98
99 tracing::info!(
100 "daemon started, polling every {}s",
101 config.discovery.poll_interval_secs
102 );
103
104 loop {
105 tokio::select! {
106 _ = interval.tick() => {
107 if let Err(e) = mirror_cycle(
108 &config,
109 &db,
110 &nostr_client,
111 &mirror,
112 &nostr_mirror,
113 &healthy,
114 ).await {
115 tracing::error!(error = %e, "mirror cycle failed");
116 }
117 }
118 _ = tokio::signal::ctrl_c() => {
119 tracing::info!("shutting down");
120 break;
121 }
122 }
123 }
124
125 Ok(())
126}
127
128async fn mirror_cycle(
129 config: &Arc<config::ResolvedConfig>,
130 db: &Arc<db::MirrorDb>,
131 nostr_client: &nostr_sdk::Client,
132 mirror: &git_mirror::GitMirror,
133 nostr_mirror: &nostr_mirror::NostrMirror,
134 servers: &Arc<Vec<health::GraspServer>>,
135) -> Result<()> {
136 tracing::info!("starting mirror cycle");
137
138 let relay_urls = config.relay_urls();
139 let repos = discovery::discover_repos_from_relays(nostr_client, &config.npubs, &relay_urls)
140 .await?;
141
142 tracing::info!(count = repos.len(), "discovered repos");
143
144 discovery::persist_discovered_repos(db, &repos).await?;
145
146 let server_map: std::collections::HashMap<String, health::GraspServer> = servers
147 .iter()
148 .map(|s| (s.domain.clone(), s.clone()))
149 .collect();
150
151 for repo in &repos {
152 let missing = discovery::identify_missing_servers(repo, &server_map);
153
154 if missing.is_empty() {
155 tracing::debug!(identifier = %repo.identifier, "repo already on all servers");
156 continue;
157 }
158
159 tracing::info!(
160 identifier = %repo.identifier,
161 missing = missing.iter().map(|s| s.domain.as_str()).collect::<Vec<_>>().join(", "),
162 "mirroring to missing servers"
163 );
164
165 mirror.mirror_repo_to_servers(db, repo, &missing).await?;
166 nostr_mirror.forward_repo_events(db, repo, servers).await?;
167 }
168
169 nostr_mirror
170 .sync_all_events(db, &config.npubs, servers)
171 .await?;
172
173 tracing::info!("mirror cycle complete");
174 Ok(())
175}
176
177async fn run_status(db: db::MirrorDb) -> Result<()> {
178 let repos = db.get_all_repos().await?;
179 let syncs = db.get_sync_summary().await?;
180
181 println!("=== GRASP Mirror Status ===\n");
182 println!("Repos tracked: {}\n", repos.len());
183
184 for repo in &repos {
185 println!(" {} ({})", repo.identifier, &repo.pubkey[..12]);
186 }
187
188 println!("\nSync records: {}\n", syncs.len());
189 for sync in &syncs {
190 let status = if sync.git_synced && sync.nostr_synced {
191 "OK"
192 } else if sync.error.is_some() {
193 "ERR"
194 } else {
195 "PENDING"
196 };
197 println!(
198 " repo:{} server:{} git:{} nostr:{} [{}]",
199 sync.repo_id,
200 sync.server_domain,
201 if sync.git_synced { "Y" } else { "N" },
202 if sync.nostr_synced { "Y" } else { "N" },
203 status,
204 );
205 if let Some(err) = &sync.error {
206 println!(" error: {}", err);
207 }
208 }
209
210 Ok(())
211}
212
213async fn run_verify(config: config::ResolvedConfig) -> Result<()> {
214 let servers = health::verify_all_servers(&config.servers.known).await;
215
216 println!("=== GRASP Server Verification ===\n");
217
218 for (domain, server) in &servers {
219 let grasp = server.is_grasp_server();
220 let version = server
221 .nip11
222 .as_ref()
223 .and_then(|i| i.version.clone())
224 .unwrap_or_else(|| "unknown".to_string());
225 let grasps = server
226 .nip11
227 .as_ref()
228 .and_then(|i| i.supported_grasps.clone())
229 .map(|g| g.join(", "))
230 .unwrap_or_else(|| "NONE".to_string());
231
232 println!(
233 " {} [{}] v={} GRASP={}",
234 domain,
235 if grasp { "OK" } else { "SKIP" },
236 version,
237 grasps,
238 );
239 }
240
241 let healthy = servers.values().filter(|s| s.is_grasp_server()).count();
242 println!(
243 "\n{}/{} verified as GRASP servers",
244 healthy,
245 servers.len()
246 );
247
248 Ok(())
249}
250
251async fn run_mirror_once(config: config::ResolvedConfig, db: db::MirrorDb) -> Result<()> {
252 let db = Arc::new(db);
253 let config = Arc::new(config);
254
255 let servers = health::verify_all_servers(&config.servers.known).await;
256 let healthy: Vec<_> = servers
257 .values()
258 .filter(|s| s.is_grasp_server())
259 .cloned()
260 .collect();
261
262 let relay_urls = config.relay_urls();
263 let nostr_client = build_nostr_client(&relay_urls);
264 nostr_client.connect().await;
265
266 let mirror = git_mirror::GitMirror::new(&config.storage.mirror_dir);
267 let nostr_mirror = nostr_mirror::NostrMirror::new(nostr_client.clone());
268 let healthy = Arc::new(healthy);
269
270 mirror_cycle(
271 &config,
272 &db,
273 &nostr_client,
274 &mirror,
275 &nostr_mirror,
276 &healthy,
277 )
278 .await
279}
diff --git a/src/nostr_mirror.rs b/src/nostr_mirror.rs
new file mode 100644
index 0000000..76f66d0
--- /dev/null
+++ b/src/nostr_mirror.rs
@@ -0,0 +1,139 @@
1use crate::db::MirrorDb;
2use crate::discovery::DiscoveredRepo;
3use crate::health::GraspServer;
4use anyhow::Result;
5use nostr::Kind;
6use nostr_sdk::prelude::*;
7
8pub struct NostrMirror {
9 client: nostr_sdk::Client,
10}
11
12impl NostrMirror {
13 pub fn new(client: nostr_sdk::Client) -> Self {
14 Self { client }
15 }
16
17 pub async fn forward_events_to_servers(
18 &self,
19 db: &MirrorDb,
20 events: &[Event],
21 servers: &[GraspServer],
22 ) -> Result<()> {
23 for event in events {
24 if db.have_seen_event(&event.id.to_hex()).await? {
25 continue;
26 }
27
28 for server in servers {
29 if !server.is_grasp_server() {
30 continue;
31 }
32
33 tracing::debug!(
34 event_id = %event.id.to_hex(),
35 kind = event.kind.as_u16(),
36 server = %server.domain,
37 "forwarding event"
38 );
39
40 let url: RelayUrl = RelayUrl::parse(&server.relay_url)?;
41 let urls = vec![url];
42
43 match self.client.send_event_to(urls, event.clone()).await {
44 Ok(_) => {
45 tracing::debug!(
46 event_id = %event.id.to_hex(),
47 server = %server.domain,
48 "event forwarded"
49 );
50 }
51 Err(e) => {
52 tracing::warn!(
53 event_id = %event.id.to_hex(),
54 server = %server.domain,
55 error = %e,
56 "failed to forward event"
57 );
58 }
59 }
60 }
61
62 let _ = db.record_event(&event.id.to_hex()).await;
63 }
64
65 Ok(())
66 }
67
68 pub async fn forward_repo_events(
69 &self,
70 db: &MirrorDb,
71 repo: &DiscoveredRepo,
72 servers: &[GraspServer],
73 ) -> Result<()> {
74 let filters = vec![
75 Filter::new()
76 .kind(Kind::Custom(30617))
77 .author(repo.pubkey)
78 .identifier(&repo.identifier),
79 Filter::new()
80 .kind(Kind::Custom(30618))
81 .author(repo.pubkey)
82 .identifier(&repo.identifier),
83 ];
84
85 let mut all_events = Vec::new();
86 for filter in filters {
87 let events = self
88 .client
89 .fetch_events(filter, std::time::Duration::from_secs(15))
90 .await?;
91 all_events.extend(events);
92 }
93
94 if all_events.is_empty() {
95 tracing::debug!(identifier = %repo.identifier, "no events to forward");
96 return Ok(());
97 }
98
99 tracing::info!(
100 identifier = %repo.identifier,
101 count = all_events.len(),
102 "forwarding repo events"
103 );
104
105 self.forward_events_to_servers(db, &all_events, servers).await
106 }
107
108 pub async fn sync_all_events(
109 &self,
110 db: &MirrorDb,
111 npubs: &[PublicKey],
112 servers: &[GraspServer],
113 ) -> Result<()> {
114 let git_kinds = [
115 Kind::Custom(30617),
116 Kind::Custom(30618),
117 Kind::Custom(1631),
118 Kind::Custom(1642),
119 Kind::EventDeletion,
120 ];
121
122 let mut all_events = Vec::new();
123
124 for pk in npubs {
125 for kind in &git_kinds {
126 let filter = Filter::new().kind(*kind).author(*pk).limit(100);
127 let events = self
128 .client
129 .fetch_events(filter, std::time::Duration::from_secs(30))
130 .await?;
131 all_events.extend(events);
132 }
133 }
134
135 tracing::info!(count = all_events.len(), "fetched events for forwarding");
136
137 self.forward_events_to_servers(db, &all_events, servers).await
138 }
139}
diff --git a/src/signing.rs b/src/signing.rs
new file mode 100644
index 0000000..7ec796a
--- /dev/null
+++ b/src/signing.rs
@@ -0,0 +1,76 @@
1use crate::discovery::DiscoveredRepo;
2use crate::health::GraspServer;
3use anyhow::{Context, Result};
4use nostr_sdk::prelude::*;
5
6pub struct OptionalSigner {
7 keys: Option<Keys>,
8}
9
10impl OptionalSigner {
11 pub fn none() -> Self {
12 Self { keys: None }
13 }
14
15 pub fn from_nsec(nsec: &str) -> Result<Self> {
16 let keys = Keys::parse(nsec).context("failed to parse nsec")?;
17 Ok(Self { keys: Some(keys) })
18 }
19
20 pub fn is_available(&self) -> bool {
21 self.keys.is_some()
22 }
23
24 pub async fn build_updated_announcement(
25 &self,
26 original: &DiscoveredRepo,
27 additional_servers: &[GraspServer],
28 ) -> Result<Option<Event>> {
29 let keys = match &self.keys {
30 Some(k) => k,
31 None => {
32 tracing::debug!("no signer available, skipping announcement update");
33 return Ok(None);
34 }
35 };
36
37 if additional_servers.is_empty() {
38 return Ok(None);
39 }
40
41 let pk_hex = original.pubkey.to_hex();
42
43 let mut new_clone_urls: Vec<String> = original.clone_urls.clone();
44 for server in additional_servers {
45 let url = server.clone_url(&pk_hex, &original.identifier);
46 if !new_clone_urls.contains(&url) {
47 new_clone_urls.push(url);
48 }
49 }
50
51 let mut tags: Vec<Tag> = vec![
52 Tag::custom(TagKind::Custom("d".into()), [&original.identifier]),
53 ];
54
55 for url in &new_clone_urls {
56 tags.push(Tag::custom(TagKind::Custom("clone".into()), [url.as_str()]));
57 }
58 for url in &original.relay_urls {
59 tags.push(Tag::custom(
60 TagKind::Custom("relays".into()),
61 [url.as_str()],
62 ));
63 }
64
65 let builder = EventBuilder::new(Kind::Custom(30617), "").tags(tags);
66 let event = builder.sign_with_keys(keys)?;
67
68 tracing::info!(
69 identifier = %original.identifier,
70 added = additional_servers.len(),
71 "built updated announcement with additional clone URLs"
72 );
73
74 Ok(Some(event))
75 }
76}