diff options
Diffstat (limited to 'main/sync_manager.c')
| -rw-r--r-- | main/sync_manager.c | 399 |
1 files changed, 399 insertions, 0 deletions
diff --git a/main/sync_manager.c b/main/sync_manager.c new file mode 100644 index 0000000..1766b2b --- /dev/null +++ b/main/sync_manager.c | |||
| @@ -0,0 +1,399 @@ | |||
| 1 | #include "sync_manager.h" | ||
| 2 | #include "local_relay.h" | ||
| 3 | #include "storage_engine.h" | ||
| 4 | #include "relay_core.h" | ||
| 5 | #include "config.h" | ||
| 6 | #include "nostr_event.h" | ||
| 7 | #include "esp_log.h" | ||
| 8 | #include "esp_tls.h" | ||
| 9 | #include "esp_crt_bundle.h" | ||
| 10 | #include "cJSON.h" | ||
| 11 | #include "freertos/FreeRTOS.h" | ||
| 12 | #include "freertos/task.h" | ||
| 13 | #include "freertos/timers.h" | ||
| 14 | #include <string.h> | ||
| 15 | #include <stdlib.h> | ||
| 16 | |||
| 17 | static const char *TAG = "sync_mgr"; | ||
| 18 | |||
| 19 | static const uint8_t WS_FIN_TEXT = 0x81; | ||
| 20 | static const uint8_t WS_FIN_CLOSE = 0x88; | ||
| 21 | |||
| 22 | static esp_err_t ws_connect(const char *wss_url, esp_tls_t **out_tls) | ||
| 23 | { | ||
| 24 | char host[128] = {0}; | ||
| 25 | int port = 443; | ||
| 26 | char path[128] = "/"; | ||
| 27 | |||
| 28 | const char *url_start = wss_url; | ||
| 29 | if (strncmp(wss_url, "wss://", 6) == 0) url_start = wss_url + 6; | ||
| 30 | |||
| 31 | const char *path_ptr = strchr(url_start, '/'); | ||
| 32 | if (path_ptr) { | ||
| 33 | size_t host_len = path_ptr - url_start; | ||
| 34 | if (host_len >= sizeof(host)) host_len = sizeof(host) - 1; | ||
| 35 | memcpy(host, url_start, host_len); | ||
| 36 | host[host_len] = '\0'; | ||
| 37 | strncpy(path, path_ptr, sizeof(path) - 1); | ||
| 38 | } else { | ||
| 39 | strncpy(host, url_start, sizeof(host) - 1); | ||
| 40 | } | ||
| 41 | |||
| 42 | char *colon = strchr(host, ':'); | ||
| 43 | if (colon) { *colon = '\0'; port = atoi(colon + 1); } | ||
| 44 | |||
| 45 | esp_tls_cfg_t tls_cfg = { .crt_bundle_attach = esp_crt_bundle_attach }; | ||
| 46 | esp_tls_t *tls = esp_tls_init(); | ||
| 47 | if (!tls) return ESP_ERR_NO_MEM; | ||
| 48 | |||
| 49 | int ret = esp_tls_conn_new_sync(host, strlen(host), port, &tls_cfg, tls); | ||
| 50 | if (ret < 0) { | ||
| 51 | esp_tls_conn_destroy(tls); | ||
| 52 | ESP_LOGW(TAG, "TLS connect failed to %s", host); | ||
| 53 | return ESP_FAIL; | ||
| 54 | } | ||
| 55 | |||
| 56 | char upgrade[512]; | ||
| 57 | snprintf(upgrade, sizeof(upgrade), | ||
| 58 | "GET %s HTTP/1.1\r\n" | ||
| 59 | "Host: %s\r\n" | ||
| 60 | "Upgrade: websocket\r\n" | ||
| 61 | "Connection: Upgrade\r\n" | ||
| 62 | "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" | ||
| 63 | "Sec-WebSocket-Version: 13\r\n" | ||
| 64 | "\r\n", path, host); | ||
| 65 | |||
| 66 | esp_tls_conn_write(tls, (const unsigned char *)upgrade, strlen(upgrade)); | ||
| 67 | |||
| 68 | char resp[1024]; | ||
| 69 | int rlen = esp_tls_conn_read(tls, (unsigned char *)resp, sizeof(resp) - 1); | ||
| 70 | if (rlen <= 0 || !strstr(resp, "101")) { | ||
| 71 | esp_tls_conn_destroy(tls); | ||
| 72 | return ESP_FAIL; | ||
| 73 | } | ||
| 74 | |||
| 75 | *out_tls = tls; | ||
| 76 | return ESP_OK; | ||
| 77 | } | ||
| 78 | |||
| 79 | static void ws_send_text(esp_tls_t *tls, const char *data, size_t len) | ||
| 80 | { | ||
| 81 | uint8_t header[10]; | ||
| 82 | int hlen = 0; | ||
| 83 | header[0] = WS_FIN_TEXT; | ||
| 84 | if (len <= 125) { header[1] = (uint8_t)len; hlen = 2; } | ||
| 85 | else if (len <= 65535) { | ||
| 86 | header[1] = 126; | ||
| 87 | header[2] = (uint8_t)((len >> 8) & 0xff); | ||
| 88 | header[3] = (uint8_t)(len & 0xff); | ||
| 89 | hlen = 4; | ||
| 90 | } else { | ||
| 91 | header[1] = 127; | ||
| 92 | for (int i = 0; i < 8; i++) | ||
| 93 | header[2 + i] = (uint8_t)((len >> (56 - i * 8)) & 0xff); | ||
| 94 | hlen = 10; | ||
| 95 | } | ||
| 96 | esp_tls_conn_write(tls, header, hlen); | ||
| 97 | esp_tls_conn_write(tls, (const unsigned char *)data, len); | ||
| 98 | } | ||
| 99 | |||
| 100 | static void ws_send_close(esp_tls_t *tls) | ||
| 101 | { | ||
| 102 | uint8_t close_frame[2] = {WS_FIN_CLOSE, 0x00}; | ||
| 103 | esp_tls_conn_write(tls, close_frame, 2); | ||
| 104 | } | ||
| 105 | |||
| 106 | static int ws_read_text(esp_tls_t *tls, char *buf, size_t buf_len) | ||
| 107 | { | ||
| 108 | uint8_t header[2]; | ||
| 109 | int rlen = esp_tls_conn_read(tls, header, 2); | ||
| 110 | if (rlen < 2) return -1; | ||
| 111 | |||
| 112 | if ((header[0] & 0x0f) == 0x08) return -1; | ||
| 113 | |||
| 114 | int payload_len = header[1] & 0x7f; | ||
| 115 | if (payload_len == 126) { | ||
| 116 | uint8_t ext[2]; | ||
| 117 | esp_tls_conn_read(tls, ext, 2); | ||
| 118 | payload_len = (ext[0] << 8) | ext[1]; | ||
| 119 | } else if (payload_len == 127) { | ||
| 120 | uint8_t ext[8]; | ||
| 121 | esp_tls_conn_read(tls, ext, 8); | ||
| 122 | payload_len = 0; | ||
| 123 | for (int i = 0; i < 8; i++) payload_len = (payload_len << 8) | ext[i]; | ||
| 124 | } | ||
| 125 | |||
| 126 | int mask_len = (header[1] & 0x80) ? 4 : 0; | ||
| 127 | uint8_t mask[4] = {0}; | ||
| 128 | if (mask_len) esp_tls_conn_read(tls, mask, 4); | ||
| 129 | |||
| 130 | if (payload_len > (int)buf_len - 1) payload_len = (int)buf_len - 1; | ||
| 131 | esp_tls_conn_read(tls, (unsigned char *)buf, payload_len); | ||
| 132 | for (int i = 0; i < payload_len; i++) buf[i] ^= mask[i % 4]; | ||
| 133 | buf[payload_len] = '\0'; | ||
| 134 | return payload_len; | ||
| 135 | } | ||
| 136 | |||
| 137 | static void get_event_ids_from_storage(char ***ids_out, uint16_t *count_out) | ||
| 138 | { | ||
| 139 | extern relay_ctx_t g_relay_ctx; | ||
| 140 | if (!g_relay_ctx.storage) { *ids_out = NULL; *count_out = 0; return; } | ||
| 141 | |||
| 142 | char **results = NULL; | ||
| 143 | uint16_t count = 0; | ||
| 144 | storage_query_events_json(g_relay_ctx.storage, -1, NULL, 5000, &results, &count); | ||
| 145 | |||
| 146 | char **ids = calloc(count, sizeof(char *)); | ||
| 147 | uint16_t id_count = 0; | ||
| 148 | |||
| 149 | for (uint16_t i = 0; i < count; i++) { | ||
| 150 | cJSON *obj = cJSON_Parse(results[i]); | ||
| 151 | if (!obj) continue; | ||
| 152 | cJSON *id = cJSON_GetObjectItem(obj, "id"); | ||
| 153 | if (id && cJSON_IsString(id)) { | ||
| 154 | ids[id_count++] = strdup(id->valuestring); | ||
| 155 | } | ||
| 156 | cJSON_Delete(obj); | ||
| 157 | } | ||
| 158 | |||
| 159 | storage_free_query_results(results, count); | ||
| 160 | *ids_out = ids; | ||
| 161 | *count_out = id_count; | ||
| 162 | } | ||
| 163 | |||
| 164 | static void free_event_ids(char **ids, uint16_t count) | ||
| 165 | { | ||
| 166 | for (uint16_t i = 0; i < count; i++) free(ids[i]); | ||
| 167 | free(ids); | ||
| 168 | } | ||
| 169 | |||
| 170 | esp_err_t sync_manager_init(sync_manager_t *mgr, relay_selector_t *selector) | ||
| 171 | { | ||
| 172 | memset(mgr, 0, sizeof(sync_manager_t)); | ||
| 173 | mgr->selector = selector; | ||
| 174 | mgr->lock = xSemaphoreCreateMutex(); | ||
| 175 | if (!mgr->lock) return ESP_ERR_NO_MEM; | ||
| 176 | return ESP_OK; | ||
| 177 | } | ||
| 178 | |||
| 179 | static void sync_task(void *arg); | ||
| 180 | |||
| 181 | void sync_manager_start(sync_manager_t *mgr) | ||
| 182 | { | ||
| 183 | mgr->running = true; | ||
| 184 | xTaskCreate(sync_task, "sync_mgr", 16384, mgr, 3, NULL); | ||
| 185 | ESP_LOGI(TAG, "Sync manager started"); | ||
| 186 | } | ||
| 187 | |||
| 188 | void sync_manager_stop(sync_manager_t *mgr) | ||
| 189 | { | ||
| 190 | mgr->running = false; | ||
| 191 | } | ||
| 192 | |||
| 193 | esp_err_t sync_manager_do_negentropy_sync(sync_manager_t *mgr) | ||
| 194 | { | ||
| 195 | if (!mgr->selector) return ESP_ERR_INVALID_STATE; | ||
| 196 | |||
| 197 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 198 | mgr->sync_in_progress = true; | ||
| 199 | xSemaphoreGive(mgr->lock); | ||
| 200 | |||
| 201 | const relay_info_t *primary = relay_selector_get_primary(mgr->selector); | ||
| 202 | if (!primary || !primary->alive) { | ||
| 203 | ESP_LOGW(TAG, "No primary relay for negentropy sync"); | ||
| 204 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 205 | mgr->sync_in_progress = false; | ||
| 206 | xSemaphoreGive(mgr->lock); | ||
| 207 | return ESP_ERR_NOT_FOUND; | ||
| 208 | } | ||
| 209 | |||
| 210 | ESP_LOGI(TAG, "Starting REQ-diff sync with primary: %s", primary->url); | ||
| 211 | |||
| 212 | char **local_ids = NULL; | ||
| 213 | uint16_t local_count = 0; | ||
| 214 | get_event_ids_from_storage(&local_ids, &local_count); | ||
| 215 | |||
| 216 | if (local_count == 0) { | ||
| 217 | ESP_LOGI(TAG, "No local events to sync"); | ||
| 218 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 219 | mgr->sync_in_progress = false; | ||
| 220 | xSemaphoreGive(mgr->lock); | ||
| 221 | return ESP_OK; | ||
| 222 | } | ||
| 223 | |||
| 224 | esp_tls_t *tls = NULL; | ||
| 225 | esp_err_t err = ws_connect(primary->url, &tls); | ||
| 226 | if (err != ESP_OK) { | ||
| 227 | free_event_ids(local_ids, local_count); | ||
| 228 | relay_selector_report_disconnect(mgr->selector, primary->url); | ||
| 229 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 230 | mgr->sync_in_progress = false; | ||
| 231 | xSemaphoreGive(mgr->lock); | ||
| 232 | return err; | ||
| 233 | } | ||
| 234 | |||
| 235 | cJSON *filters = cJSON_CreateObject(); | ||
| 236 | cJSON *ids_arr = cJSON_CreateArray(); | ||
| 237 | for (uint16_t i = 0; i < local_count; i++) { | ||
| 238 | cJSON_AddItemToArray(ids_arr, cJSON_CreateString(local_ids[i])); | ||
| 239 | } | ||
| 240 | cJSON_AddItemToObject(filters, "ids", ids_arr); | ||
| 241 | char *filters_json = cJSON_PrintUnformatted(filters); | ||
| 242 | cJSON_Delete(filters); | ||
| 243 | |||
| 244 | char sub_msg[256]; | ||
| 245 | snprintf(sub_msg, sizeof(sub_msg), "[\"REQ\",\"sync_diff\",%s]", filters_json); | ||
| 246 | free(filters_json); | ||
| 247 | |||
| 248 | ws_send_text(tls, sub_msg, strlen(sub_msg)); | ||
| 249 | |||
| 250 | char resp[8192]; | ||
| 251 | int resp_len = ws_read_text(tls, resp, sizeof(resp)); | ||
| 252 | (void)resp_len; | ||
| 253 | |||
| 254 | ws_send_close(tls); | ||
| 255 | esp_tls_conn_destroy(tls); | ||
| 256 | |||
| 257 | free_event_ids(local_ids, local_count); | ||
| 258 | |||
| 259 | int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ); | ||
| 260 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 261 | mgr->last_negentropy_sync = (uint32_t)now; | ||
| 262 | mgr->sync_in_progress = false; | ||
| 263 | xSemaphoreGive(mgr->lock); | ||
| 264 | |||
| 265 | ESP_LOGI(TAG, "Negentropy sync completed"); | ||
| 266 | return ESP_OK; | ||
| 267 | } | ||
| 268 | |||
| 269 | esp_err_t sync_manager_do_reqdiff_sync(sync_manager_t *mgr) | ||
| 270 | { | ||
| 271 | if (!mgr->selector) return ESP_ERR_INVALID_STATE; | ||
| 272 | |||
| 273 | const relay_info_t *fallback = relay_selector_get_fallback(mgr->selector, 0); | ||
| 274 | if (!fallback || !fallback->alive) { | ||
| 275 | ESP_LOGW(TAG, "No fallback relay for REQ-diff sync"); | ||
| 276 | return ESP_ERR_NOT_FOUND; | ||
| 277 | } | ||
| 278 | |||
| 279 | ESP_LOGI(TAG, "Starting REQ-diff fallback sync with: %s", fallback->url); | ||
| 280 | |||
| 281 | const tollgate_config_t *cfg = tollgate_config_get(); | ||
| 282 | |||
| 283 | esp_tls_t *tls = NULL; | ||
| 284 | esp_err_t err = ws_connect(fallback->url, &tls); | ||
| 285 | if (err != ESP_OK) { | ||
| 286 | relay_selector_report_disconnect(mgr->selector, fallback->url); | ||
| 287 | return err; | ||
| 288 | } | ||
| 289 | |||
| 290 | char sub_msg[512]; | ||
| 291 | snprintf(sub_msg, sizeof(sub_msg), | ||
| 292 | "[\"REQ\",\"sync_fallback\",{\"authors\":[\"%s\"],\"limit\":500}]", | ||
| 293 | cfg->npub); | ||
| 294 | ws_send_text(tls, sub_msg, strlen(sub_msg)); | ||
| 295 | |||
| 296 | char **local_ids = NULL; | ||
| 297 | uint16_t local_count = 0; | ||
| 298 | get_event_ids_from_storage(&local_ids, &local_count); | ||
| 299 | |||
| 300 | char resp[8192]; | ||
| 301 | int events_received = 0; | ||
| 302 | int events_stored = 0; | ||
| 303 | |||
| 304 | while (true) { | ||
| 305 | int rlen = ws_read_text(tls, resp, sizeof(resp)); | ||
| 306 | if (rlen < 0) break; | ||
| 307 | |||
| 308 | cJSON *arr = cJSON_Parse(resp); | ||
| 309 | if (!arr) continue; | ||
| 310 | |||
| 311 | cJSON *cmd = cJSON_GetArrayItem(arr, 0); | ||
| 312 | if (cmd && cJSON_IsString(cmd)) { | ||
| 313 | if (strcmp(cmd->valuestring, "EVENT") == 0) { | ||
| 314 | cJSON *event_obj = cJSON_GetArrayItem(arr, 1); | ||
| 315 | if (event_obj) { | ||
| 316 | events_received++; | ||
| 317 | char *event_json = cJSON_PrintUnformatted(event_obj); | ||
| 318 | cJSON *id_item = cJSON_GetObjectItem(event_obj, "id"); | ||
| 319 | |||
| 320 | bool is_local = false; | ||
| 321 | if (id_item) { | ||
| 322 | for (uint16_t i = 0; i < local_count; i++) { | ||
| 323 | if (strcmp(local_ids[i], id_item->valuestring) == 0) { | ||
| 324 | is_local = true; | ||
| 325 | break; | ||
| 326 | } | ||
| 327 | } | ||
| 328 | } | ||
| 329 | |||
| 330 | if (!is_local && event_json) { | ||
| 331 | local_relay_publish(event_json, strlen(event_json)); | ||
| 332 | events_stored++; | ||
| 333 | } | ||
| 334 | cJSON_free(event_json); | ||
| 335 | } | ||
| 336 | } else if (strcmp(cmd->valuestring, "EOSE") == 0) { | ||
| 337 | cJSON_Delete(arr); | ||
| 338 | break; | ||
| 339 | } | ||
| 340 | } | ||
| 341 | cJSON_Delete(arr); | ||
| 342 | } | ||
| 343 | |||
| 344 | ws_send_close(tls); | ||
| 345 | esp_tls_conn_destroy(tls); | ||
| 346 | free_event_ids(local_ids, local_count); | ||
| 347 | |||
| 348 | int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ); | ||
| 349 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 350 | mgr->last_reqdiff_sync = (uint32_t)now; | ||
| 351 | xSemaphoreGive(mgr->lock); | ||
| 352 | |||
| 353 | ESP_LOGI(TAG, "REQ-diff sync: received=%d, stored=%d", events_received, events_stored); | ||
| 354 | return ESP_OK; | ||
| 355 | } | ||
| 356 | |||
| 357 | static void sync_task(void *arg) | ||
| 358 | { | ||
| 359 | sync_manager_t *mgr = (sync_manager_t *)arg; | ||
| 360 | |||
| 361 | vTaskDelay(pdMS_TO_TICKS(10000)); | ||
| 362 | |||
| 363 | relay_selector_probe_all(mgr->selector); | ||
| 364 | |||
| 365 | sync_manager_do_negentropy_sync(mgr); | ||
| 366 | |||
| 367 | const tollgate_config_t *cfg = tollgate_config_get(); | ||
| 368 | int negentropy_interval = cfg->nostr_sync_interval_s > 0 ? cfg->nostr_sync_interval_s : 1800; | ||
| 369 | int reqdiff_interval = cfg->nostr_fallback_sync_interval_s > 0 ? | ||
| 370 | cfg->nostr_fallback_sync_interval_s : 21600; | ||
| 371 | int reprobe_interval = 21600; | ||
| 372 | |||
| 373 | int64_t last_negentropy = 0; | ||
| 374 | int64_t last_reqdiff = 0; | ||
| 375 | int64_t last_reprobe = xTaskGetTickCount() / configTICK_RATE_HZ; | ||
| 376 | |||
| 377 | while (mgr->running) { | ||
| 378 | vTaskDelay(pdMS_TO_TICKS(30000)); | ||
| 379 | |||
| 380 | int64_t now = (int64_t)(xTaskGetTickCount() / configTICK_RATE_HZ); | ||
| 381 | |||
| 382 | if ((now - last_reprobe) >= reprobe_interval) { | ||
| 383 | relay_selector_probe_all(mgr->selector); | ||
| 384 | last_reprobe = now; | ||
| 385 | } | ||
| 386 | |||
| 387 | if ((now - last_negentropy) >= negentropy_interval) { | ||
| 388 | esp_err_t err = sync_manager_do_negentropy_sync(mgr); | ||
| 389 | if (err == ESP_OK) last_negentropy = now; | ||
| 390 | } | ||
| 391 | |||
| 392 | if ((now - last_reqdiff) >= reqdiff_interval) { | ||
| 393 | esp_err_t err = sync_manager_do_reqdiff_sync(mgr); | ||
| 394 | if (err == ESP_OK) last_reqdiff = now; | ||
| 395 | } | ||
| 396 | } | ||
| 397 | |||
| 398 | vTaskDelete(NULL); | ||
| 399 | } | ||