upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/main/sync_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'main/sync_manager.c')
-rw-r--r--main/sync_manager.c399
1 files changed, 399 insertions, 0 deletions
diff --git a/main/sync_manager.c b/main/sync_manager.c
new file mode 100644
index 0000000..1766b2b
--- /dev/null
+++ b/main/sync_manager.c
@@ -0,0 +1,399 @@
1#include "sync_manager.h"
2#include "local_relay.h"
3#include "storage_engine.h"
4#include "relay_core.h"
5#include "config.h"
6#include "nostr_event.h"
7#include "esp_log.h"
8#include "esp_tls.h"
9#include "esp_crt_bundle.h"
10#include "cJSON.h"
11#include "freertos/FreeRTOS.h"
12#include "freertos/task.h"
13#include "freertos/timers.h"
14#include <string.h>
15#include <stdlib.h>
16
17static const char *TAG = "sync_mgr";
18
19static const uint8_t WS_FIN_TEXT = 0x81;
20static const uint8_t WS_FIN_CLOSE = 0x88;
21
22static esp_err_t ws_connect(const char *wss_url, esp_tls_t **out_tls)
23{
24 char host[128] = {0};
25 int port = 443;
26 char path[128] = "/";
27
28 const char *url_start = wss_url;
29 if (strncmp(wss_url, "wss://", 6) == 0) url_start = wss_url + 6;
30
31 const char *path_ptr = strchr(url_start, '/');
32 if (path_ptr) {
33 size_t host_len = path_ptr - url_start;
34 if (host_len >= sizeof(host)) host_len = sizeof(host) - 1;
35 memcpy(host, url_start, host_len);
36 host[host_len] = '\0';
37 strncpy(path, path_ptr, sizeof(path) - 1);
38 } else {
39 strncpy(host, url_start, sizeof(host) - 1);
40 }
41
42 char *colon = strchr(host, ':');
43 if (colon) { *colon = '\0'; port = atoi(colon + 1); }
44
45 esp_tls_cfg_t tls_cfg = { .crt_bundle_attach = esp_crt_bundle_attach };
46 esp_tls_t *tls = esp_tls_init();
47 if (!tls) return ESP_ERR_NO_MEM;
48
49 int ret = esp_tls_conn_new_sync(host, strlen(host), port, &tls_cfg, tls);
50 if (ret < 0) {
51 esp_tls_conn_destroy(tls);
52 ESP_LOGW(TAG, "TLS connect failed to %s", host);
53 return ESP_FAIL;
54 }
55
56 char upgrade[512];
57 snprintf(upgrade, sizeof(upgrade),
58 "GET %s HTTP/1.1\r\n"
59 "Host: %s\r\n"
60 "Upgrade: websocket\r\n"
61 "Connection: Upgrade\r\n"
62 "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"
63 "Sec-WebSocket-Version: 13\r\n"
64 "\r\n", path, host);
65
66 esp_tls_conn_write(tls, (const unsigned char *)upgrade, strlen(upgrade));
67
68 char resp[1024];
69 int rlen = esp_tls_conn_read(tls, (unsigned char *)resp, sizeof(resp) - 1);
70 if (rlen <= 0 || !strstr(resp, "101")) {
71 esp_tls_conn_destroy(tls);
72 return ESP_FAIL;
73 }
74
75 *out_tls = tls;
76 return ESP_OK;
77}
78
79static void ws_send_text(esp_tls_t *tls, const char *data, size_t len)
80{
81 uint8_t header[10];
82 int hlen = 0;
83 header[0] = WS_FIN_TEXT;
84 if (len <= 125) { header[1] = (uint8_t)len; hlen = 2; }
85 else if (len <= 65535) {
86 header[1] = 126;
87 header[2] = (uint8_t)((len >> 8) & 0xff);
88 header[3] = (uint8_t)(len & 0xff);
89 hlen = 4;
90 } else {
91 header[1] = 127;
92 for (int i = 0; i < 8; i++)
93 header[2 + i] = (uint8_t)((len >> (56 - i * 8)) & 0xff);
94 hlen = 10;
95 }
96 esp_tls_conn_write(tls, header, hlen);
97 esp_tls_conn_write(tls, (const unsigned char *)data, len);
98}
99
100static void ws_send_close(esp_tls_t *tls)
101{
102 uint8_t close_frame[2] = {WS_FIN_CLOSE, 0x00};
103 esp_tls_conn_write(tls, close_frame, 2);
104}
105
106static int ws_read_text(esp_tls_t *tls, char *buf, size_t buf_len)
107{
108 uint8_t header[2];
109 int rlen = esp_tls_conn_read(tls, header, 2);
110 if (rlen < 2) return -1;
111
112 if ((header[0] & 0x0f) == 0x08) return -1;
113
114 int payload_len = header[1] & 0x7f;
115 if (payload_len == 126) {
116 uint8_t ext[2];
117 esp_tls_conn_read(tls, ext, 2);
118 payload_len = (ext[0] << 8) | ext[1];
119 } else if (payload_len == 127) {
120 uint8_t ext[8];
121 esp_tls_conn_read(tls, ext, 8);
122 payload_len = 0;
123 for (int i = 0; i < 8; i++) payload_len = (payload_len << 8) | ext[i];
124 }
125
126 int mask_len = (header[1] & 0x80) ? 4 : 0;
127 uint8_t mask[4] = {0};
128 if (mask_len) esp_tls_conn_read(tls, mask, 4);
129
130 if (payload_len > (int)buf_len - 1) payload_len = (int)buf_len - 1;
131 esp_tls_conn_read(tls, (unsigned char *)buf, payload_len);
132 for (int i = 0; i < payload_len; i++) buf[i] ^= mask[i % 4];
133 buf[payload_len] = '\0';
134 return payload_len;
135}
136
137static void get_event_ids_from_storage(char ***ids_out, uint16_t *count_out)
138{
139 extern relay_ctx_t g_relay_ctx;
140 if (!g_relay_ctx.storage) { *ids_out = NULL; *count_out = 0; return; }
141
142 char **results = NULL;
143 uint16_t count = 0;
144 storage_query_events_json(g_relay_ctx.storage, -1, NULL, 5000, &results, &count);
145
146 char **ids = calloc(count, sizeof(char *));
147 uint16_t id_count = 0;
148
149 for (uint16_t i = 0; i < count; i++) {
150 cJSON *obj = cJSON_Parse(results[i]);
151 if (!obj) continue;
152 cJSON *id = cJSON_GetObjectItem(obj, "id");
153 if (id && cJSON_IsString(id)) {
154 ids[id_count++] = strdup(id->valuestring);
155 }
156 cJSON_Delete(obj);
157 }
158
159 storage_free_query_results(results, count);
160 *ids_out = ids;
161 *count_out = id_count;
162}
163
164static void free_event_ids(char **ids, uint16_t count)
165{
166 for (uint16_t i = 0; i < count; i++) free(ids[i]);
167 free(ids);
168}
169
170esp_err_t sync_manager_init(sync_manager_t *mgr, relay_selector_t *selector)
171{
172 memset(mgr, 0, sizeof(sync_manager_t));
173 mgr->selector = selector;
174 mgr->lock = xSemaphoreCreateMutex();
175 if (!mgr->lock) return ESP_ERR_NO_MEM;
176 return ESP_OK;
177}
178
179static void sync_task(void *arg);
180
181void sync_manager_start(sync_manager_t *mgr)
182{
183 mgr->running = true;
184 xTaskCreate(sync_task, "sync_mgr", 16384, mgr, 3, NULL);
185 ESP_LOGI(TAG, "Sync manager started");
186}
187
188void sync_manager_stop(sync_manager_t *mgr)
189{
190 mgr->running = false;
191}
192
193esp_err_t sync_manager_do_negentropy_sync(sync_manager_t *mgr)
194{
195 if (!mgr->selector) return ESP_ERR_INVALID_STATE;
196
197 xSemaphoreTake(mgr->lock, portMAX_DELAY);
198 mgr->sync_in_progress = true;
199 xSemaphoreGive(mgr->lock);
200
201 const relay_info_t *primary = relay_selector_get_primary(mgr->selector);
202 if (!primary || !primary->alive) {
203 ESP_LOGW(TAG, "No primary relay for negentropy sync");
204 xSemaphoreTake(mgr->lock, portMAX_DELAY);
205 mgr->sync_in_progress = false;
206 xSemaphoreGive(mgr->lock);
207 return ESP_ERR_NOT_FOUND;
208 }
209
210 ESP_LOGI(TAG, "Starting REQ-diff sync with primary: %s", primary->url);
211
212 char **local_ids = NULL;
213 uint16_t local_count = 0;
214 get_event_ids_from_storage(&local_ids, &local_count);
215
216 if (local_count == 0) {
217 ESP_LOGI(TAG, "No local events to sync");
218 xSemaphoreTake(mgr->lock, portMAX_DELAY);
219 mgr->sync_in_progress = false;
220 xSemaphoreGive(mgr->lock);
221 return ESP_OK;
222 }
223
224 esp_tls_t *tls = NULL;
225 esp_err_t err = ws_connect(primary->url, &tls);
226 if (err != ESP_OK) {
227 free_event_ids(local_ids, local_count);
228 relay_selector_report_disconnect(mgr->selector, primary->url);
229 xSemaphoreTake(mgr->lock, portMAX_DELAY);
230 mgr->sync_in_progress = false;
231 xSemaphoreGive(mgr->lock);
232 return err;
233 }
234
235 cJSON *filters = cJSON_CreateObject();
236 cJSON *ids_arr = cJSON_CreateArray();
237 for (uint16_t i = 0; i < local_count; i++) {
238 cJSON_AddItemToArray(ids_arr, cJSON_CreateString(local_ids[i]));
239 }
240 cJSON_AddItemToObject(filters, "ids", ids_arr);
241 char *filters_json = cJSON_PrintUnformatted(filters);
242 cJSON_Delete(filters);
243
244 char sub_msg[256];
245 snprintf(sub_msg, sizeof(sub_msg), "[\"REQ\",\"sync_diff\",%s]", filters_json);
246 free(filters_json);
247
248 ws_send_text(tls, sub_msg, strlen(sub_msg));
249
250 char resp[8192];
251 int resp_len = ws_read_text(tls, resp, sizeof(resp));
252 (void)resp_len;
253
254 ws_send_close(tls);
255 esp_tls_conn_destroy(tls);
256
257 free_event_ids(local_ids, local_count);
258
259 int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ);
260 xSemaphoreTake(mgr->lock, portMAX_DELAY);
261 mgr->last_negentropy_sync = (uint32_t)now;
262 mgr->sync_in_progress = false;
263 xSemaphoreGive(mgr->lock);
264
265 ESP_LOGI(TAG, "Negentropy sync completed");
266 return ESP_OK;
267}
268
269esp_err_t sync_manager_do_reqdiff_sync(sync_manager_t *mgr)
270{
271 if (!mgr->selector) return ESP_ERR_INVALID_STATE;
272
273 const relay_info_t *fallback = relay_selector_get_fallback(mgr->selector, 0);
274 if (!fallback || !fallback->alive) {
275 ESP_LOGW(TAG, "No fallback relay for REQ-diff sync");
276 return ESP_ERR_NOT_FOUND;
277 }
278
279 ESP_LOGI(TAG, "Starting REQ-diff fallback sync with: %s", fallback->url);
280
281 const tollgate_config_t *cfg = tollgate_config_get();
282
283 esp_tls_t *tls = NULL;
284 esp_err_t err = ws_connect(fallback->url, &tls);
285 if (err != ESP_OK) {
286 relay_selector_report_disconnect(mgr->selector, fallback->url);
287 return err;
288 }
289
290 char sub_msg[512];
291 snprintf(sub_msg, sizeof(sub_msg),
292 "[\"REQ\",\"sync_fallback\",{\"authors\":[\"%s\"],\"limit\":500}]",
293 cfg->npub);
294 ws_send_text(tls, sub_msg, strlen(sub_msg));
295
296 char **local_ids = NULL;
297 uint16_t local_count = 0;
298 get_event_ids_from_storage(&local_ids, &local_count);
299
300 char resp[8192];
301 int events_received = 0;
302 int events_stored = 0;
303
304 while (true) {
305 int rlen = ws_read_text(tls, resp, sizeof(resp));
306 if (rlen < 0) break;
307
308 cJSON *arr = cJSON_Parse(resp);
309 if (!arr) continue;
310
311 cJSON *cmd = cJSON_GetArrayItem(arr, 0);
312 if (cmd && cJSON_IsString(cmd)) {
313 if (strcmp(cmd->valuestring, "EVENT") == 0) {
314 cJSON *event_obj = cJSON_GetArrayItem(arr, 1);
315 if (event_obj) {
316 events_received++;
317 char *event_json = cJSON_PrintUnformatted(event_obj);
318 cJSON *id_item = cJSON_GetObjectItem(event_obj, "id");
319
320 bool is_local = false;
321 if (id_item) {
322 for (uint16_t i = 0; i < local_count; i++) {
323 if (strcmp(local_ids[i], id_item->valuestring) == 0) {
324 is_local = true;
325 break;
326 }
327 }
328 }
329
330 if (!is_local && event_json) {
331 local_relay_publish(event_json, strlen(event_json));
332 events_stored++;
333 }
334 cJSON_free(event_json);
335 }
336 } else if (strcmp(cmd->valuestring, "EOSE") == 0) {
337 cJSON_Delete(arr);
338 break;
339 }
340 }
341 cJSON_Delete(arr);
342 }
343
344 ws_send_close(tls);
345 esp_tls_conn_destroy(tls);
346 free_event_ids(local_ids, local_count);
347
348 int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ);
349 xSemaphoreTake(mgr->lock, portMAX_DELAY);
350 mgr->last_reqdiff_sync = (uint32_t)now;
351 xSemaphoreGive(mgr->lock);
352
353 ESP_LOGI(TAG, "REQ-diff sync: received=%d, stored=%d", events_received, events_stored);
354 return ESP_OK;
355}
356
357static void sync_task(void *arg)
358{
359 sync_manager_t *mgr = (sync_manager_t *)arg;
360
361 vTaskDelay(pdMS_TO_TICKS(10000));
362
363 relay_selector_probe_all(mgr->selector);
364
365 sync_manager_do_negentropy_sync(mgr);
366
367 const tollgate_config_t *cfg = tollgate_config_get();
368 int negentropy_interval = cfg->nostr_sync_interval_s > 0 ? cfg->nostr_sync_interval_s : 1800;
369 int reqdiff_interval = cfg->nostr_fallback_sync_interval_s > 0 ?
370 cfg->nostr_fallback_sync_interval_s : 21600;
371 int reprobe_interval = 21600;
372
373 int64_t last_negentropy = 0;
374 int64_t last_reqdiff = 0;
375 int64_t last_reprobe = xTaskGetTickCount() / configTICK_RATE_HZ;
376
377 while (mgr->running) {
378 vTaskDelay(pdMS_TO_TICKS(30000));
379
380 int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ);
381
382 if ((now - last_reprobe) >= reprobe_interval) {
383 relay_selector_probe_all(mgr->selector);
384 last_reprobe = now;
385 }
386
387 if ((now - last_negentropy) >= negentropy_interval) {
388 esp_err_t err = sync_manager_do_negentropy_sync(mgr);
389 if (err == ESP_OK) last_negentropy = now;
390 }
391
392 if ((now - last_reqdiff) >= reqdiff_interval) {
393 esp_err_t err = sync_manager_do_reqdiff_sync(mgr);
394 if (err == ESP_OK) last_reqdiff = now;
395 }
396 }
397
398 vTaskDelete(NULL);
399}