use anyhow::{Context, Result}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::SqlitePool; use std::path::Path; pub struct MirrorDb { pool: SqlitePool, } #[derive(Debug, sqlx::FromRow)] pub struct RepoRecord { pub id: i64, pub pubkey: String, pub identifier: String, pub announcement_event_id: Option, pub last_seen_at: i64, } #[derive(Debug, sqlx::FromRow)] pub struct ServerSyncRecord { pub id: i64, pub repo_id: i64, pub server_domain: String, pub git_synced: bool, pub nostr_synced: bool, pub last_sync_at: Option, pub error: Option, } #[derive(Debug, sqlx::FromRow)] pub struct EventRecord { pub id: i64, pub event_id: String, pub first_seen_at: i64, } impl MirrorDb { pub async fn open(db_path: &Path) -> Result { let opts = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true) .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal); let pool = SqlitePoolOptions::new() .max_connections(5) .connect_with(opts) .await .with_context(|| format!("failed to open database at {:?}", db_path))?; let db = Self { pool }; db.run_migrations().await?; Ok(db) } async fn run_migrations(&self) -> Result<()> { sqlx::query( r#" CREATE TABLE IF NOT EXISTS repos ( id INTEGER PRIMARY KEY AUTOINCREMENT, pubkey TEXT NOT NULL, identifier TEXT NOT NULL, announcement_event_id TEXT, last_seen_at INTEGER NOT NULL, UNIQUE(pubkey, identifier) ); CREATE TABLE IF NOT EXISTS server_syncs ( id INTEGER PRIMARY KEY AUTOINCREMENT, repo_id INTEGER NOT NULL REFERENCES repos(id), server_domain TEXT NOT NULL, git_synced INTEGER NOT NULL DEFAULT 0, nostr_synced INTEGER NOT NULL DEFAULT 0, last_sync_at INTEGER, error TEXT, UNIQUE(repo_id, server_domain) ); CREATE TABLE IF NOT EXISTS seen_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL UNIQUE, first_seen_at INTEGER NOT NULL ); CREATE INDEX IF NOT EXISTS idx_repos_pubkey ON repos(pubkey); CREATE INDEX IF NOT EXISTS idx_server_syncs_repo ON server_syncs(repo_id); CREATE INDEX IF NOT EXISTS idx_seen_events_id ON seen_events(event_id); CREATE TABLE IF NOT EXISTS nip46_sessions ( npub TEXT PRIMARY KEY, client_secret TEXT NOT NULL, signer_pubkey TEXT, connected INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS signing_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, npub TEXT NOT NULL, repo_identifier TEXT NOT NULL, state_event_json TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', created_at INTEGER NOT NULL, signed_at INTEGER, error TEXT ); "#, ) .execute(&self.pool) .await .context("failed to run migrations")?; Ok(()) } pub async fn upsert_repo( &self, pubkey: &str, identifier: &str, event_id: &str, ) -> Result { let now = chrono_now_secs(); let result = sqlx::query_as::<_, RepoRecord>( "SELECT * FROM repos WHERE pubkey = ? AND identifier = ?", ) .bind(pubkey) .bind(identifier) .fetch_optional(&self.pool) .await?; if let Some(existing) = result { sqlx::query("UPDATE repos SET announcement_event_id = ?, last_seen_at = ? WHERE id = ?") .bind(event_id) .bind(now) .bind(existing.id) .execute(&self.pool) .await?; Ok(existing.id) } else { sqlx::query("INSERT INTO repos (pubkey, identifier, announcement_event_id, last_seen_at) VALUES (?, ?, ?, ?)") .bind(pubkey) .bind(identifier) .bind(event_id) .bind(now) .execute(&self.pool) .await?; let row: (i64,) = sqlx::query_as("SELECT last_insert_rowid()") .fetch_one(&self.pool) .await?; Ok(row.0) } } pub async fn get_repos_needing_git_sync(&self, known_servers: &[String]) -> Result)>> { let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos") .fetch_all(&self.pool) .await?; let mut result = Vec::new(); for repo in repos { let synced: Vec = sqlx::query_scalar::<_, String>( "SELECT server_domain FROM server_syncs WHERE repo_id = ? AND git_synced = 1", ) .bind(repo.id) .fetch_all(&self.pool) .await?; let missing: Vec = known_servers .iter() .filter(|s| !synced.contains(s)) .cloned() .collect(); if !missing.is_empty() { result.push((repo, missing)); } } Ok(result) } pub async fn mark_git_synced( &self, repo_id: i64, server_domain: &str, ) -> Result<()> { let now = chrono_now_secs(); sqlx::query( r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) VALUES (?, ?, 1, 0, ?) ON CONFLICT(repo_id, server_domain) DO UPDATE SET git_synced = 1, last_sync_at = ?, error = NULL"#, ) .bind(repo_id) .bind(server_domain) .bind(now) .bind(now) .execute(&self.pool) .await?; Ok(()) } pub async fn mark_nostr_synced( &self, repo_id: i64, server_domain: &str, ) -> Result<()> { let now = chrono_now_secs(); sqlx::query( r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at) VALUES (?, ?, 0, 1, ?) ON CONFLICT(repo_id, server_domain) DO UPDATE SET nostr_synced = 1, last_sync_at = ?, error = NULL"#, ) .bind(repo_id) .bind(server_domain) .bind(now) .bind(now) .execute(&self.pool) .await?; Ok(()) } pub async fn mark_sync_error( &self, repo_id: i64, server_domain: &str, error: &str, ) -> Result<()> { let now = chrono_now_secs(); sqlx::query( r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at, error) VALUES (?, ?, 0, 0, ?, ?) ON CONFLICT(repo_id, server_domain) DO UPDATE SET error = ?, last_sync_at = ?"#, ) .bind(repo_id) .bind(server_domain) .bind(now) .bind(error) .bind(error) .bind(now) .execute(&self.pool) .await?; Ok(()) } pub async fn have_seen_event(&self, event_id: &str) -> Result { let result: Option<(i64,)> = sqlx::query_as( "SELECT id FROM seen_events WHERE event_id = ?", ) .bind(event_id) .fetch_optional(&self.pool) .await?; Ok(result.is_some()) } pub async fn record_event(&self, event_id: &str) -> Result<()> { let now = chrono_now_secs(); sqlx::query("INSERT OR IGNORE INTO seen_events (event_id, first_seen_at) VALUES (?, ?)") .bind(event_id) .bind(now) .execute(&self.pool) .await?; Ok(()) } pub async fn get_all_repos(&self) -> Result> { let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos ORDER BY last_seen_at DESC") .fetch_all(&self.pool) .await?; Ok(repos) } pub async fn get_sync_summary(&self) -> Result> { let records = sqlx::query_as::<_, ServerSyncRecord>( "SELECT * FROM server_syncs ORDER BY last_sync_at DESC NULLS LAST", ) .fetch_all(&self.pool) .await?; Ok(records) } pub async fn get_nip46_session(&self, npub: &str) -> Result> { let result = sqlx::query_as::<_, Nip46SessionRecord>( "SELECT * FROM nip46_sessions WHERE npub = ?", ) .bind(npub) .fetch_optional(&self.pool) .await?; Ok(result) } pub async fn upsert_nip46_session( &self, npub: &str, client_secret: &str, signer_pubkey: Option<&str>, connected: bool, ) -> Result<()> { sqlx::query( r#"INSERT INTO nip46_sessions (npub, client_secret, signer_pubkey, connected) VALUES (?, ?, ?, ?) ON CONFLICT(npub) DO UPDATE SET client_secret = ?, signer_pubkey = ?, connected = ?"#, ) .bind(npub) .bind(client_secret) .bind(signer_pubkey) .bind(connected as i32) .bind(client_secret) .bind(signer_pubkey) .bind(connected as i32) .execute(&self.pool) .await?; Ok(()) } pub async fn get_all_nip46_sessions(&self) -> Result> { let records = sqlx::query_as::<_, Nip46SessionRecord>("SELECT * FROM nip46_sessions") .fetch_all(&self.pool) .await?; Ok(records) } } #[derive(Debug, sqlx::FromRow)] pub struct Nip46SessionRecord { pub npub: String, pub client_secret: String, pub signer_pubkey: Option, pub connected: bool, } fn chrono_now_secs() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs() as i64 }