diff options
| author | Your Name <you@example.com> | 2026-05-26 16:11:05 +0530 |
|---|---|---|
| committer | Your Name <you@example.com> | 2026-05-26 16:11:05 +0530 |
| commit | 8816a192c95cf539b65975469a2d61aed46f0414 (patch) | |
| tree | 7590318244a56fabbfa6919ef6d0fab5be529134 /src/db.rs | |
feat: initial implementation of grasp-mirror daemon
GRASP mirror daemon that discovers repos from watched npubs and mirrors
git data + Nostr events across all known GRASP servers for redundancy.
Features:
- Configurable npub watch list via .env (MIRROR_NPUBS)
- TOML config for GRASP server list, index relays, storage paths
- NIP-11 verification of GRASP servers on startup
- Discovery of repos via kind:30617 announcements on index relays
- Git mirroring (bare clone + push --mirror) to missing GRASP servers
- Nostr event forwarding to all GRASP server embedded relays
- SQLite state tracking for sync status and event dedup
- Optional signing key for updating announcements with new clone URLs
- CLI subcommands: daemon, status, verify, mirror-once
Architecture:
config.rs - TOML + .env config loading
db.rs - SQLite state tracking
health.rs - NIP-11 GRASP server verification
discovery.rs - Relay subscription, kind:30617 parsing
git_mirror.rs - Bare clone + push to GRASP servers
nostr_mirror.rs - Event forwarding to all GRASP relays
signing.rs - Optional announcement updates
main.rs - CLI entry point, daemon loop
Diffstat (limited to 'src/db.rs')
| -rw-r--r-- | src/db.rs | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..bb1bf31 --- /dev/null +++ b/src/db.rs | |||
| @@ -0,0 +1,268 @@ | |||
| 1 | use anyhow::{Context, Result}; | ||
| 2 | use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; | ||
| 3 | use sqlx::SqlitePool; | ||
| 4 | use std::path::Path; | ||
| 5 | |||
| 6 | pub struct MirrorDb { | ||
| 7 | pool: SqlitePool, | ||
| 8 | } | ||
| 9 | |||
| 10 | #[derive(Debug, sqlx::FromRow)] | ||
| 11 | pub struct RepoRecord { | ||
| 12 | pub id: i64, | ||
| 13 | pub pubkey: String, | ||
| 14 | pub identifier: String, | ||
| 15 | pub announcement_event_id: Option<String>, | ||
| 16 | pub last_seen_at: i64, | ||
| 17 | } | ||
| 18 | |||
| 19 | #[derive(Debug, sqlx::FromRow)] | ||
| 20 | pub struct ServerSyncRecord { | ||
| 21 | pub id: i64, | ||
| 22 | pub repo_id: i64, | ||
| 23 | pub server_domain: String, | ||
| 24 | pub git_synced: bool, | ||
| 25 | pub nostr_synced: bool, | ||
| 26 | pub last_sync_at: Option<i64>, | ||
| 27 | pub error: Option<String>, | ||
| 28 | } | ||
| 29 | |||
| 30 | #[derive(Debug, sqlx::FromRow)] | ||
| 31 | pub struct EventRecord { | ||
| 32 | pub id: i64, | ||
| 33 | pub event_id: String, | ||
| 34 | pub first_seen_at: i64, | ||
| 35 | } | ||
| 36 | |||
| 37 | impl MirrorDb { | ||
| 38 | pub async fn open(db_path: &Path) -> Result<Self> { | ||
| 39 | let opts = SqliteConnectOptions::new() | ||
| 40 | .filename(db_path) | ||
| 41 | .create_if_missing(true) | ||
| 42 | .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); | ||
| 43 | |||
| 44 | let pool = SqlitePoolOptions::new() | ||
| 45 | .max_connections(5) | ||
| 46 | .connect_with(opts) | ||
| 47 | .await | ||
| 48 | .with_context(|| format!("failed to open database at {:?}", db_path))?; | ||
| 49 | |||
| 50 | let db = Self { pool }; | ||
| 51 | db.run_migrations().await?; | ||
| 52 | Ok(db) | ||
| 53 | } | ||
| 54 | |||
| 55 | async fn run_migrations(&self) -> Result<()> { | ||
| 56 | sqlx::query( | ||
| 57 | r#" | ||
| 58 | CREATE TABLE IF NOT EXISTS repos ( | ||
| 59 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 60 | pubkey TEXT NOT NULL, | ||
| 61 | identifier TEXT NOT NULL, | ||
| 62 | announcement_event_id TEXT, | ||
| 63 | last_seen_at INTEGER NOT NULL, | ||
| 64 | UNIQUE(pubkey, identifier) | ||
| 65 | ); | ||
| 66 | |||
| 67 | CREATE TABLE IF NOT EXISTS server_syncs ( | ||
| 68 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 69 | repo_id INTEGER NOT NULL REFERENCES repos(id), | ||
| 70 | server_domain TEXT NOT NULL, | ||
| 71 | git_synced INTEGER NOT NULL DEFAULT 0, | ||
| 72 | nostr_synced INTEGER NOT NULL DEFAULT 0, | ||
| 73 | last_sync_at INTEGER, | ||
| 74 | error TEXT, | ||
| 75 | UNIQUE(repo_id, server_domain) | ||
| 76 | ); | ||
| 77 | |||
| 78 | CREATE TABLE IF NOT EXISTS seen_events ( | ||
| 79 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
| 80 | event_id TEXT NOT NULL UNIQUE, | ||
| 81 | first_seen_at INTEGER NOT NULL | ||
| 82 | ); | ||
| 83 | |||
| 84 | CREATE INDEX IF NOT EXISTS idx_repos_pubkey ON repos(pubkey); | ||
| 85 | CREATE INDEX IF NOT EXISTS idx_server_syncs_repo ON server_syncs(repo_id); | ||
| 86 | CREATE INDEX IF NOT EXISTS idx_seen_events_id ON seen_events(event_id); | ||
| 87 | "#, | ||
| 88 | ) | ||
| 89 | .execute(&self.pool) | ||
| 90 | .await | ||
| 91 | .context("failed to run migrations")?; | ||
| 92 | |||
| 93 | Ok(()) | ||
| 94 | } | ||
| 95 | |||
| 96 | pub async fn upsert_repo( | ||
| 97 | &self, | ||
| 98 | pubkey: &str, | ||
| 99 | identifier: &str, | ||
| 100 | event_id: &str, | ||
| 101 | ) -> Result<i64> { | ||
| 102 | let now = chrono_now_secs(); | ||
| 103 | let result = sqlx::query_as::<_, RepoRecord>( | ||
| 104 | "SELECT * FROM repos WHERE pubkey = ? AND identifier = ?", | ||
| 105 | ) | ||
| 106 | .bind(pubkey) | ||
| 107 | .bind(identifier) | ||
| 108 | .fetch_optional(&self.pool) | ||
| 109 | .await?; | ||
| 110 | |||
| 111 | if let Some(existing) = result { | ||
| 112 | sqlx::query("UPDATE repos SET announcement_event_id = ?, last_seen_at = ? WHERE id = ?") | ||
| 113 | .bind(event_id) | ||
| 114 | .bind(now) | ||
| 115 | .bind(existing.id) | ||
| 116 | .execute(&self.pool) | ||
| 117 | .await?; | ||
| 118 | Ok(existing.id) | ||
| 119 | } else { | ||
| 120 | sqlx::query("INSERT INTO repos (pubkey, identifier, announcement_event_id, last_seen_at) VALUES (?, ?, ?, ?)") | ||
| 121 | .bind(pubkey) | ||
| 122 | .bind(identifier) | ||
| 123 | .bind(event_id) | ||
| 124 | .bind(now) | ||
| 125 | .execute(&self.pool) | ||
| 126 | .await?; | ||
| 127 | |||
| 128 | let row: (i64,) = sqlx::query_as("SELECT last_insert_rowid()") | ||
| 129 | .fetch_one(&self.pool) | ||
| 130 | .await?; | ||
| 131 | Ok(row.0) | ||
| 132 | } | ||
| 133 | } | ||
| 134 | |||
| 135 | pub async fn get_repos_needing_git_sync(&self, known_servers: &[String]) -> Result<Vec<(RepoRecord, Vec<String>)>> { | ||
| 136 | let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos") | ||
| 137 | .fetch_all(&self.pool) | ||
| 138 | .await?; | ||
| 139 | |||
| 140 | let mut result = Vec::new(); | ||
| 141 | for repo in repos { | ||
| 142 | let synced: Vec<String> = sqlx::query_scalar::<_, String>( | ||
| 143 | "SELECT server_domain FROM server_syncs WHERE repo_id = ? AND git_synced = 1", | ||
| 144 | ) | ||
| 145 | .bind(repo.id) | ||
| 146 | .fetch_all(&self.pool) | ||
| 147 | .await?; | ||
| 148 | |||
| 149 | let missing: Vec<String> = known_servers | ||
| 150 | .iter() | ||
| 151 | .filter(|s| !synced.contains(s)) | ||
| 152 | .cloned() | ||
| 153 | .collect(); | ||
| 154 | |||
| 155 | if !missing.is_empty() { | ||
| 156 | result.push((repo, missing)); | ||
| 157 | } | ||
| 158 | } | ||
| 159 | |||
| 160 | Ok(result) | ||
| 161 | } | ||
| 162 | |||
| 163 | pub async fn mark_git_synced( | ||
| 164 | &self, | ||
| 165 | repo_id: i64, | ||
| 166 | server_domain: &str, | ||
| 167 | ) -> Result<()> { | ||
| 168 | let now = chrono_now_secs(); | ||
| 169 | sqlx::query( | ||
| 170 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) | ||
| 171 | VALUES (?, ?, 1, 0, ?) | ||
| 172 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET git_synced = 1, last_sync_at = ?, error = NULL"#, | ||
| 173 | ) | ||
| 174 | .bind(repo_id) | ||
| 175 | .bind(server_domain) | ||
| 176 | .bind(now) | ||
| 177 | .bind(now) | ||
| 178 | .execute(&self.pool) | ||
| 179 | .await?; | ||
| 180 | Ok(()) | ||
| 181 | } | ||
| 182 | |||
| 183 | pub async fn mark_nostr_synced( | ||
| 184 | &self, | ||
| 185 | repo_id: i64, | ||
| 186 | server_domain: &str, | ||
| 187 | ) -> Result<()> { | ||
| 188 | let now = chrono_now_secs(); | ||
| 189 | sqlx::query( | ||
| 190 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) | ||
| 191 | VALUES (?, ?, 0, 1, ?) | ||
| 192 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET nostr_synced = 1, last_sync_at = ?, error = NULL"#, | ||
| 193 | ) | ||
| 194 | .bind(repo_id) | ||
| 195 | .bind(server_domain) | ||
| 196 | .bind(now) | ||
| 197 | .bind(now) | ||
| 198 | .execute(&self.pool) | ||
| 199 | .await?; | ||
| 200 | Ok(()) | ||
| 201 | } | ||
| 202 | |||
| 203 | pub async fn mark_sync_error( | ||
| 204 | &self, | ||
| 205 | repo_id: i64, | ||
| 206 | server_domain: &str, | ||
| 207 | error: &str, | ||
| 208 | ) -> Result<()> { | ||
| 209 | let now = chrono_now_secs(); | ||
| 210 | sqlx::query( | ||
| 211 | r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at, error) | ||
| 212 | VALUES (?, ?, 0, 0, ?, ?) | ||
| 213 | ON CONFLICT(repo_id, server_domain) DO UPDATE SET error = ?, last_sync_at = ?"#, | ||
| 214 | ) | ||
| 215 | .bind(repo_id) | ||
| 216 | .bind(server_domain) | ||
| 217 | .bind(now) | ||
| 218 | .bind(error) | ||
| 219 | .bind(error) | ||
| 220 | .bind(now) | ||
| 221 | .execute(&self.pool) | ||
| 222 | .await?; | ||
| 223 | Ok(()) | ||
| 224 | } | ||
| 225 | |||
| 226 | pub async fn have_seen_event(&self, event_id: &str) -> Result<bool> { | ||
| 227 | let result: Option<(i64,)> = sqlx::query_as( | ||
| 228 | "SELECT id FROM seen_events WHERE event_id = ?", | ||
| 229 | ) | ||
| 230 | .bind(event_id) | ||
| 231 | .fetch_optional(&self.pool) | ||
| 232 | .await?; | ||
| 233 | Ok(result.is_some()) | ||
| 234 | } | ||
| 235 | |||
| 236 | pub async fn record_event(&self, event_id: &str) -> Result<()> { | ||
| 237 | let now = chrono_now_secs(); | ||
| 238 | sqlx::query("INSERT OR IGNORE INTO seen_events (event_id, first_seen_at) VALUES (?, ?)") | ||
| 239 | .bind(event_id) | ||
| 240 | .bind(now) | ||
| 241 | .execute(&self.pool) | ||
| 242 | .await?; | ||
| 243 | Ok(()) | ||
| 244 | } | ||
| 245 | |||
| 246 | pub async fn get_all_repos(&self) -> Result<Vec<RepoRecord>> { | ||
| 247 | let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos ORDER BY last_seen_at DESC") | ||
| 248 | .fetch_all(&self.pool) | ||
| 249 | .await?; | ||
| 250 | Ok(repos) | ||
| 251 | } | ||
| 252 | |||
| 253 | pub async fn get_sync_summary(&self) -> Result<Vec<ServerSyncRecord>> { | ||
| 254 | let records = sqlx::query_as::<_, ServerSyncRecord>( | ||
| 255 | "SELECT * FROM server_syncs ORDER BY last_sync_at DESC NULLS LAST", | ||
| 256 | ) | ||
| 257 | .fetch_all(&self.pool) | ||
| 258 | .await?; | ||
| 259 | Ok(records) | ||
| 260 | } | ||
| 261 | } | ||
| 262 | |||
| 263 | fn chrono_now_secs() -> i64 { | ||
| 264 | std::time::SystemTime::now() | ||
| 265 | .duration_since(std::time::UNIX_EPOCH) | ||
| 266 | .unwrap_or_default() | ||
| 267 | .as_secs() as i64 | ||
| 268 | } | ||