upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/components/wisp_relay/storage_engine.c
diff options
context:
space:
mode:
Diffstat (limited to 'components/wisp_relay/storage_engine.c')
-rw-r--r--components/wisp_relay/storage_engine.c402
1 files changed, 402 insertions, 0 deletions
diff --git a/components/wisp_relay/storage_engine.c b/components/wisp_relay/storage_engine.c
new file mode 100644
index 0000000..d26705b
--- /dev/null
+++ b/components/wisp_relay/storage_engine.c
@@ -0,0 +1,402 @@
1#include "storage_engine.h"
2#include "esp_littlefs.h"
3#include "esp_log.h"
4#include "nvs_flash.h"
5#include "nvs.h"
6#include <inttypes.h>
7#include <string.h>
8#include <stdio.h>
9#include <sys/stat.h>
10#include <time.h>
11#include <unistd.h>
12
13static const char *TAG = "storage";
14
15#define INDEX_NVS_NAMESPACE "nostr_idx"
16#define EVENTS_DIR "/littlefs/events"
17
18static void get_event_path(const uint8_t event_id[32], uint32_t file_index,
19 char *path, size_t len)
20{
21 char id_hex[33];
22 for (int i = 0; i < 16; i++) sprintf(id_hex + i * 2, "%02x", event_id[i]);
23 snprintf(path, len, EVENTS_DIR "/%02x/%s_%08" PRIx32 ".json",
24 event_id[0], id_hex, file_index);
25}
26
27static int save_index_to_nvs(storage_engine_t *engine)
28{
29 nvs_handle_t nvs;
30 esp_err_t err = nvs_open(INDEX_NVS_NAMESPACE, NVS_READWRITE, &nvs);
31 if (err != ESP_OK) return STORAGE_ERR_IO;
32
33 nvs_set_u16(nvs, "count", engine->index_count);
34 nvs_set_u32(nvs, "next_idx", engine->next_file_index);
35
36 const uint16_t chunk_size = 50;
37 for (uint16_t i = 0; i < engine->index_count; i += chunk_size) {
38 char key[16];
39 snprintf(key, sizeof(key), "idx_%u", i / chunk_size);
40 uint16_t entries = engine->index_count - i;
41 if (entries > chunk_size) entries = chunk_size;
42 nvs_set_blob(nvs, key, &engine->index[i], entries * sizeof(storage_index_entry_t));
43 }
44 nvs_commit(nvs);
45 nvs_close(nvs);
46 return STORAGE_OK;
47}
48
49static int load_index_from_nvs(storage_engine_t *engine)
50{
51 nvs_handle_t nvs;
52 esp_err_t err = nvs_open(INDEX_NVS_NAMESPACE, NVS_READONLY, &nvs);
53 if (err == ESP_ERR_NVS_NOT_FOUND) return STORAGE_OK;
54 if (err != ESP_OK) return STORAGE_ERR_IO;
55
56 err = nvs_get_u16(nvs, "count", &engine->index_count);
57 if (err != ESP_OK) { nvs_close(nvs); return STORAGE_ERR_IO; }
58 if (engine->index_count > engine->max_index_entries) engine->index_count = engine->max_index_entries;
59
60 err = nvs_get_u32(nvs, "next_idx", &engine->next_file_index);
61 if (err != ESP_OK) { nvs_close(nvs); return STORAGE_ERR_IO; }
62
63 const uint16_t chunk_size = 50;
64 for (uint16_t i = 0; i < engine->index_count; i += chunk_size) {
65 char key[16];
66 snprintf(key, sizeof(key), "idx_%u", i / chunk_size);
67 uint16_t entries = engine->index_count - i;
68 if (entries > chunk_size) entries = chunk_size;
69 size_t len = entries * sizeof(storage_index_entry_t);
70 nvs_get_blob(nvs, key, &engine->index[i], &len);
71 }
72 nvs_close(nvs);
73 return STORAGE_OK;
74}
75
76static storage_index_entry_t *find_index_entry(storage_engine_t *engine,
77 const uint8_t event_id[32])
78{
79 for (uint16_t i = 0; i < engine->index_count; i++) {
80 if (memcmp(engine->index[i].event_id, event_id, 32) == 0 &&
81 !(engine->index[i].flags & STORAGE_FLAG_DELETED)) {
82 return &engine->index[i];
83 }
84 }
85 return NULL;
86}
87
88static void parse_event_meta(const char *json, size_t len,
89 uint8_t *id_out, uint8_t *pubkey_out,
90 uint64_t *created_at_out, int *kind_out)
91{
92 extern int relay_hex_to_bytes(const char *hex, size_t hex_len, uint8_t *out, size_t out_len);
93 extern void relay_bytes_to_hex(const uint8_t *bytes, size_t len, char *hex);
94
95 id_out[0] = 0; pubkey_out[0] = 0; *created_at_out = 0; *kind_out = 0;
96
97 const char *p;
98 p = strstr(json, "\"id\":\"");
99 if (p) relay_hex_to_bytes(p + 6, 64, id_out, 32);
100 p = strstr(json, "\"pubkey\":\"");
101 if (p) relay_hex_to_bytes(p + 10, 64, pubkey_out, 32);
102 p = strstr(json, "\"created_at\":");
103 if (p) *created_at_out = strtoull(p + 13, NULL, 10);
104 p = strstr(json, "\"kind\":");
105 if (p) *kind_out = atoi(p + 7);
106}
107
108esp_err_t storage_init(storage_engine_t *engine, uint32_t default_ttl_sec)
109{
110 memset(engine, 0, sizeof(storage_engine_t));
111 engine->default_ttl_sec = default_ttl_sec;
112 strcpy(engine->mount_point, "/littlefs");
113
114 engine->lock = xSemaphoreCreateMutex();
115 if (!engine->lock) return ESP_ERR_NO_MEM;
116
117 engine->max_index_entries = STORAGE_INDEX_ENTRIES;
118 engine->index = heap_caps_calloc(engine->max_index_entries,
119 sizeof(storage_index_entry_t),
120 MALLOC_CAP_SPIRAM | MALLOC_CAP_8BIT);
121 if (!engine->index) {
122 engine->max_index_entries = 1000;
123 engine->index = calloc(engine->max_index_entries, sizeof(storage_index_entry_t));
124 if (!engine->index) { vSemaphoreDelete(engine->lock); return ESP_ERR_NO_MEM; }
125 }
126
127 esp_vfs_littlefs_conf_t conf = {
128 .base_path = "/littlefs",
129 .partition_label = STORAGE_PARTITION_LABEL,
130 .format_if_mount_failed = true,
131 .dont_mount = false,
132 };
133
134 esp_err_t ret = esp_vfs_littlefs_register(&conf);
135 if (ret != ESP_OK) {
136 ESP_LOGE(TAG, "Failed to mount LittleFS: %s", esp_err_to_name(ret));
137 free(engine->index);
138 vSemaphoreDelete(engine->lock);
139 return ret;
140 }
141
142 mkdir(EVENTS_DIR, 0755);
143 for (int i = 0; i < 256; i++) {
144 char subdir[64];
145 snprintf(subdir, sizeof(subdir), EVENTS_DIR "/%02x", i);
146 mkdir(subdir, 0755);
147 }
148
149 int load_err = load_index_from_nvs(engine);
150 if (load_err != STORAGE_OK) {
151 ESP_LOGW(TAG, "Failed to load index, starting fresh");
152 engine->index_count = 0;
153 engine->next_file_index = 0;
154 }
155
156 engine->initialized = true;
157
158 size_t total, used;
159 esp_littlefs_info(STORAGE_PARTITION_LABEL, &total, &used);
160 ESP_LOGI(TAG, "Storage initialized: %" PRIu16 " events, %zu/%zu bytes used",
161 engine->index_count, used, total);
162 return ESP_OK;
163}
164
165void storage_destroy(storage_engine_t *engine)
166{
167 if (!engine->initialized) return;
168 if (engine->cleanup_task) {
169 engine->cleanup_stop = true;
170 while (engine->cleanup_task != NULL) vTaskDelay(pdMS_TO_TICKS(100));
171 }
172 save_index_to_nvs(engine);
173 esp_vfs_littlefs_unregister(STORAGE_PARTITION_LABEL);
174 if (engine->index) { free(engine->index); engine->index = NULL; }
175 if (engine->lock) { vSemaphoreDelete(engine->lock); engine->lock = NULL; }
176 engine->initialized = false;
177}
178
179storage_error_t storage_save_event_json(storage_engine_t *engine,
180 const char *event_json,
181 size_t event_json_len)
182{
183 if (!engine->initialized) return STORAGE_ERR_NOT_INITIALIZED;
184
185 uint8_t id[32] = {0}, pubkey[32] = {0};
186 uint64_t created_at = 0;
187 int kind = 0;
188 parse_event_meta(event_json, event_json_len, id, pubkey, &created_at, &kind);
189
190 xSemaphoreTake(engine->lock, portMAX_DELAY);
191
192 if (find_index_entry(engine, id)) {
193 xSemaphoreGive(engine->lock);
194 return STORAGE_ERR_DUPLICATE;
195 }
196 if (engine->index_count >= engine->max_index_entries) {
197 xSemaphoreGive(engine->lock);
198 return STORAGE_ERR_FULL;
199 }
200
201 char path[128];
202 get_event_path(id, engine->next_file_index, path, sizeof(path));
203 FILE *f = fopen(path, "wb");
204 if (!f) {
205 char dir[64];
206 snprintf(dir, sizeof(dir), EVENTS_DIR "/%02x", id[0]);
207 mkdir(dir, 0755);
208 f = fopen(path, "wb");
209 }
210 if (!f) { xSemaphoreGive(engine->lock); return STORAGE_ERR_IO; }
211
212 fwrite(event_json, 1, event_json_len, f);
213 fclose(f);
214
215 storage_index_entry_t *entry = &engine->index[engine->index_count];
216 memcpy(entry->event_id, id, 32);
217 entry->created_at = (uint32_t)created_at;
218 entry->kind = kind;
219 memcpy(entry->pubkey_prefix, pubkey, 4);
220 entry->file_index = engine->next_file_index;
221 entry->flags = 0;
222 entry->expires_at = (uint32_t)time(NULL) + engine->default_ttl_sec;
223
224 engine->index_count++;
225 engine->next_file_index++;
226 if (engine->index_count % 10 == 0) save_index_to_nvs(engine);
227
228 xSemaphoreGive(engine->lock);
229 return STORAGE_OK;
230}
231
232bool storage_event_exists(storage_engine_t *engine, const uint8_t event_id[32])
233{
234 if (!engine->initialized) return false;
235 xSemaphoreTake(engine->lock, portMAX_DELAY);
236 bool exists = (find_index_entry(engine, event_id) != NULL);
237 xSemaphoreGive(engine->lock);
238 return exists;
239}
240
241storage_error_t storage_query_events_json(storage_engine_t *engine,
242 int kind,
243 const char *author_hex,
244 int limit,
245 char ***results,
246 uint16_t *count)
247{
248 if (!engine->initialized) return STORAGE_ERR_NOT_INITIALIZED;
249 *results = NULL;
250 *count = 0;
251 if (limit > 500) limit = 500;
252 if (limit <= 0) limit = 100;
253
254 char **out = calloc(limit, sizeof(char *));
255 if (!out) return STORAGE_ERR_NO_MEM;
256
257 xSemaphoreTake(engine->lock, portMAX_DELAY);
258 uint32_t now = (uint32_t)time(NULL);
259 uint16_t found = 0;
260
261 uint8_t author_prefix[4] = {0};
262 int have_author = 0;
263 if (author_hex && strlen(author_hex) >= 8) {
264 extern int relay_hex_to_bytes(const char *, size_t, uint8_t *, size_t);
265 relay_hex_to_bytes(author_hex, 8, author_prefix, 4);
266 have_author = 1;
267 }
268
269 for (int i = engine->index_count - 1; i >= 0 && found < limit; i--) {
270 storage_index_entry_t *e = &engine->index[i];
271 if (e->flags & STORAGE_FLAG_DELETED) continue;
272 if (e->expires_at > 0 && e->expires_at < now) continue;
273 if (kind > 0 && e->kind != kind) continue;
274 if (have_author && memcmp(e->pubkey_prefix, author_prefix, 4) != 0) continue;
275
276 char path[128];
277 get_event_path(e->event_id, e->file_index, path, sizeof(path));
278 FILE *f = fopen(path, "rb");
279 if (!f) continue;
280 fseek(f, 0, SEEK_END);
281 long sz = ftell(f);
282 fseek(f, 0, SEEK_SET);
283 if (sz <= 0 || sz > STORAGE_MAX_EVENT_SIZE) { fclose(f); continue; }
284 char *buf = malloc(sz + 1);
285 fread(buf, 1, sz, f);
286 buf[sz] = '\0';
287 fclose(f);
288 out[found++] = buf;
289 }
290
291 xSemaphoreGive(engine->lock);
292 *results = out;
293 *count = found;
294 return STORAGE_OK;
295}
296
297void storage_free_query_results(char **results, uint16_t count)
298{
299 if (!results) return;
300 for (uint16_t i = 0; i < count; i++) free(results[i]);
301 free(results);
302}
303
304storage_error_t storage_delete_event(storage_engine_t *engine, const uint8_t event_id[32])
305{
306 if (!engine->initialized) return STORAGE_ERR_NOT_INITIALIZED;
307 xSemaphoreTake(engine->lock, portMAX_DELAY);
308 storage_index_entry_t *e = find_index_entry(engine, event_id);
309 if (!e) { xSemaphoreGive(engine->lock); return STORAGE_ERR_NOT_FOUND; }
310 char path[128];
311 get_event_path(e->event_id, e->file_index, path, sizeof(path));
312 unlink(path);
313 e->flags |= STORAGE_FLAG_DELETED;
314 save_index_to_nvs(engine);
315 xSemaphoreGive(engine->lock);
316 return STORAGE_OK;
317}
318
319int storage_purge_expired(storage_engine_t *engine)
320{
321 if (!engine->initialized) return 0;
322 xSemaphoreTake(engine->lock, portMAX_DELAY);
323 uint32_t now = (uint32_t)time(NULL);
324 int purged = 0;
325 for (uint16_t i = 0; i < engine->index_count; i++) {
326 if (engine->index[i].flags & STORAGE_FLAG_DELETED) continue;
327 if (engine->index[i].expires_at > 0 && engine->index[i].expires_at < now) {
328 char path[128];
329 get_event_path(engine->index[i].event_id, engine->index[i].file_index, path, sizeof(path));
330 unlink(path);
331 engine->index[i].flags |= STORAGE_FLAG_DELETED;
332 purged++;
333 }
334 }
335 if (purged > 0) { save_index_to_nvs(engine); ESP_LOGI(TAG, "Purged %d expired events", purged); }
336 xSemaphoreGive(engine->lock);
337 return purged;
338}
339
340int storage_compact_index(storage_engine_t *engine)
341{
342 if (!engine->initialized) return 0;
343 xSemaphoreTake(engine->lock, portMAX_DELAY);
344 uint16_t write_idx = 0;
345 int compacted = 0;
346 for (uint16_t read_idx = 0; read_idx < engine->index_count; read_idx++) {
347 if (!(engine->index[read_idx].flags & STORAGE_FLAG_DELETED)) {
348 if (write_idx != read_idx)
349 memcpy(&engine->index[write_idx], &engine->index[read_idx], sizeof(storage_index_entry_t));
350 write_idx++;
351 } else {
352 compacted++;
353 }
354 }
355 if (compacted > 0) {
356 engine->index_count = write_idx;
357 save_index_to_nvs(engine);
358 ESP_LOGI(TAG, "Compacted: removed %d, %" PRIu16 " remaining", compacted, engine->index_count);
359 }
360 xSemaphoreGive(engine->lock);
361 return compacted;
362}
363
364void storage_get_stats(storage_engine_t *engine, storage_stats_t *stats)
365{
366 memset(stats, 0, sizeof(storage_stats_t));
367 if (!engine->initialized) return;
368 xSemaphoreTake(engine->lock, portMAX_DELAY);
369 uint32_t now = (uint32_t)time(NULL);
370 for (uint16_t i = 0; i < engine->index_count; i++) {
371 if (engine->index[i].flags & STORAGE_FLAG_DELETED) continue;
372 if (engine->index[i].expires_at > 0 && engine->index[i].expires_at < now) continue;
373 stats->total_events++;
374 }
375 size_t total, used;
376 esp_littlefs_info(STORAGE_PARTITION_LABEL, &total, &used);
377 stats->total_bytes = total;
378 stats->free_bytes = total - used;
379 xSemaphoreGive(engine->lock);
380}
381
382static void storage_cleanup_task(void *arg)
383{
384 storage_engine_t *engine = (storage_engine_t *)arg;
385 int cycles = 0;
386 while (!engine->cleanup_stop) {
387 for (int i = 0; i < 60 && !engine->cleanup_stop; i++) vTaskDelay(pdMS_TO_TICKS(1000));
388 if (engine->cleanup_stop) break;
389 storage_purge_expired(engine);
390 if (++cycles >= 10) { storage_compact_index(engine); cycles = 0; }
391 }
392 engine->cleanup_task = NULL;
393 vTaskDelete(NULL);
394}
395
396esp_err_t storage_start_cleanup_task(storage_engine_t *engine)
397{
398 engine->cleanup_stop = false;
399 BaseType_t ret = xTaskCreate(storage_cleanup_task, "relay_cleanup", 4096, engine, 2, &engine->cleanup_task);
400 if (ret != pdPASS) { engine->cleanup_task = NULL; return ESP_ERR_NO_MEM; }
401 return ESP_OK;
402}