upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/db.rs')
-rw-r--r--src/db.rs268
1 files changed, 268 insertions, 0 deletions
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..bb1bf31
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,268 @@
1use anyhow::{Context, Result};
2use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
3use sqlx::SqlitePool;
4use std::path::Path;
5
6pub struct MirrorDb {
7 pool: SqlitePool,
8}
9
10#[derive(Debug, sqlx::FromRow)]
11pub struct RepoRecord {
12 pub id: i64,
13 pub pubkey: String,
14 pub identifier: String,
15 pub announcement_event_id: Option<String>,
16 pub last_seen_at: i64,
17}
18
19#[derive(Debug, sqlx::FromRow)]
20pub struct ServerSyncRecord {
21 pub id: i64,
22 pub repo_id: i64,
23 pub server_domain: String,
24 pub git_synced: bool,
25 pub nostr_synced: bool,
26 pub last_sync_at: Option<i64>,
27 pub error: Option<String>,
28}
29
30#[derive(Debug, sqlx::FromRow)]
31pub struct EventRecord {
32 pub id: i64,
33 pub event_id: String,
34 pub first_seen_at: i64,
35}
36
37impl MirrorDb {
38 pub async fn open(db_path: &Path) -> Result<Self> {
39 let opts = SqliteConnectOptions::new()
40 .filename(db_path)
41 .create_if_missing(true)
42 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
43
44 let pool = SqlitePoolOptions::new()
45 .max_connections(5)
46 .connect_with(opts)
47 .await
48 .with_context(|| format!("failed to open database at {:?}", db_path))?;
49
50 let db = Self { pool };
51 db.run_migrations().await?;
52 Ok(db)
53 }
54
55 async fn run_migrations(&self) -> Result<()> {
56 sqlx::query(
57 r#"
58 CREATE TABLE IF NOT EXISTS repos (
59 id INTEGER PRIMARY KEY AUTOINCREMENT,
60 pubkey TEXT NOT NULL,
61 identifier TEXT NOT NULL,
62 announcement_event_id TEXT,
63 last_seen_at INTEGER NOT NULL,
64 UNIQUE(pubkey, identifier)
65 );
66
67 CREATE TABLE IF NOT EXISTS server_syncs (
68 id INTEGER PRIMARY KEY AUTOINCREMENT,
69 repo_id INTEGER NOT NULL REFERENCES repos(id),
70 server_domain TEXT NOT NULL,
71 git_synced INTEGER NOT NULL DEFAULT 0,
72 nostr_synced INTEGER NOT NULL DEFAULT 0,
73 last_sync_at INTEGER,
74 error TEXT,
75 UNIQUE(repo_id, server_domain)
76 );
77
78 CREATE TABLE IF NOT EXISTS seen_events (
79 id INTEGER PRIMARY KEY AUTOINCREMENT,
80 event_id TEXT NOT NULL UNIQUE,
81 first_seen_at INTEGER NOT NULL
82 );
83
84 CREATE INDEX IF NOT EXISTS idx_repos_pubkey ON repos(pubkey);
85 CREATE INDEX IF NOT EXISTS idx_server_syncs_repo ON server_syncs(repo_id);
86 CREATE INDEX IF NOT EXISTS idx_seen_events_id ON seen_events(event_id);
87 "#,
88 )
89 .execute(&self.pool)
90 .await
91 .context("failed to run migrations")?;
92
93 Ok(())
94 }
95
96 pub async fn upsert_repo(
97 &self,
98 pubkey: &str,
99 identifier: &str,
100 event_id: &str,
101 ) -> Result<i64> {
102 let now = chrono_now_secs();
103 let result = sqlx::query_as::<_, RepoRecord>(
104 "SELECT * FROM repos WHERE pubkey = ? AND identifier = ?",
105 )
106 .bind(pubkey)
107 .bind(identifier)
108 .fetch_optional(&self.pool)
109 .await?;
110
111 if let Some(existing) = result {
112 sqlx::query("UPDATE repos SET announcement_event_id = ?, last_seen_at = ? WHERE id = ?")
113 .bind(event_id)
114 .bind(now)
115 .bind(existing.id)
116 .execute(&self.pool)
117 .await?;
118 Ok(existing.id)
119 } else {
120 sqlx::query("INSERT INTO repos (pubkey, identifier, announcement_event_id, last_seen_at) VALUES (?, ?, ?, ?)")
121 .bind(pubkey)
122 .bind(identifier)
123 .bind(event_id)
124 .bind(now)
125 .execute(&self.pool)
126 .await?;
127
128 let row: (i64,) = sqlx::query_as("SELECT last_insert_rowid()")
129 .fetch_one(&self.pool)
130 .await?;
131 Ok(row.0)
132 }
133 }
134
135 pub async fn get_repos_needing_git_sync(&self, known_servers: &[String]) -> Result<Vec<(RepoRecord, Vec<String>)>> {
136 let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos")
137 .fetch_all(&self.pool)
138 .await?;
139
140 let mut result = Vec::new();
141 for repo in repos {
142 let synced: Vec<String> = sqlx::query_scalar::<_, String>(
143 "SELECT server_domain FROM server_syncs WHERE repo_id = ? AND git_synced = 1",
144 )
145 .bind(repo.id)
146 .fetch_all(&self.pool)
147 .await?;
148
149 let missing: Vec<String> = known_servers
150 .iter()
151 .filter(|s| !synced.contains(s))
152 .cloned()
153 .collect();
154
155 if !missing.is_empty() {
156 result.push((repo, missing));
157 }
158 }
159
160 Ok(result)
161 }
162
163 pub async fn mark_git_synced(
164 &self,
165 repo_id: i64,
166 server_domain: &str,
167 ) -> Result<()> {
168 let now = chrono_now_secs();
169 sqlx::query(
170 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at)
171 VALUES (?, ?, 1, 0, ?)
172 ON CONFLICT(repo_id, server_domain) DO UPDATE SET git_synced = 1, last_sync_at = ?, error = NULL"#,
173 )
174 .bind(repo_id)
175 .bind(server_domain)
176 .bind(now)
177 .bind(now)
178 .execute(&self.pool)
179 .await?;
180 Ok(())
181 }
182
183 pub async fn mark_nostr_synced(
184 &self,
185 repo_id: i64,
186 server_domain: &str,
187 ) -> Result<()> {
188 let now = chrono_now_secs();
189 sqlx::query(
190 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at)
191 VALUES (?, ?, 0, 1, ?)
192 ON CONFLICT(repo_id, server_domain) DO UPDATE SET nostr_synced = 1, last_sync_at = ?, error = NULL"#,
193 )
194 .bind(repo_id)
195 .bind(server_domain)
196 .bind(now)
197 .bind(now)
198 .execute(&self.pool)
199 .await?;
200 Ok(())
201 }
202
203 pub async fn mark_sync_error(
204 &self,
205 repo_id: i64,
206 server_domain: &str,
207 error: &str,
208 ) -> Result<()> {
209 let now = chrono_now_secs();
210 sqlx::query(
211 r#"INSERT INTO server_syncs (repo_id, server_domain, git_synced, nostr_synced, last_sync_at, error)
212 VALUES (?, ?, 0, 0, ?, ?)
213 ON CONFLICT(repo_id, server_domain) DO UPDATE SET error = ?, last_sync_at = ?"#,
214 )
215 .bind(repo_id)
216 .bind(server_domain)
217 .bind(now)
218 .bind(error)
219 .bind(error)
220 .bind(now)
221 .execute(&self.pool)
222 .await?;
223 Ok(())
224 }
225
226 pub async fn have_seen_event(&self, event_id: &str) -> Result<bool> {
227 let result: Option<(i64,)> = sqlx::query_as(
228 "SELECT id FROM seen_events WHERE event_id = ?",
229 )
230 .bind(event_id)
231 .fetch_optional(&self.pool)
232 .await?;
233 Ok(result.is_some())
234 }
235
236 pub async fn record_event(&self, event_id: &str) -> Result<()> {
237 let now = chrono_now_secs();
238 sqlx::query("INSERT OR IGNORE INTO seen_events (event_id, first_seen_at) VALUES (?, ?)")
239 .bind(event_id)
240 .bind(now)
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 pub async fn get_all_repos(&self) -> Result<Vec<RepoRecord>> {
247 let repos = sqlx::query_as::<_, RepoRecord>("SELECT * FROM repos ORDER BY last_seen_at DESC")
248 .fetch_all(&self.pool)
249 .await?;
250 Ok(repos)
251 }
252
253 pub async fn get_sync_summary(&self) -> Result<Vec<ServerSyncRecord>> {
254 let records = sqlx::query_as::<_, ServerSyncRecord>(
255 "SELECT * FROM server_syncs ORDER BY last_sync_at DESC NULLS LAST",
256 )
257 .fetch_all(&self.pool)
258 .await?;
259 Ok(records)
260 }
261}
262
263fn chrono_now_secs() -> i64 {
264 std::time::SystemTime::now()
265 .duration_since(std::time::UNIX_EPOCH)
266 .unwrap_or_default()
267 .as_secs() as i64
268}