diff options
| author | Your Name <you@example.com> | 2026-05-19 02:31:19 +0530 |
|---|---|---|
| committer | Your Name <you@example.com> | 2026-05-19 02:32:41 +0530 |
| commit | 81f2dc52dc42d01c89dff45a5407ec40b8863052 (patch) | |
| tree | 15018c2438639ca89dc6d33a5144c10d0b1c2af0 /components/wisp_relay/sub_manager.c | |
| parent | 75688d55b3c8d13c8c9a50da9668ec408f684cb3 (diff) | |
feat: local Nostr relay with relay selection, sync, and integration tests
Local Nostr relay (NIP-01) on port 4869 with LittleFS 4MB storage.
All events published locally first, then synced to public relays via REQ-diff.
Relay selection via NIP-11 HTTP probing with NIP-77 scoring and auto-failover.
Components:
- wisp_relay: 16-file local relay (ws_server, storage_engine, sub_manager,
broadcaster, relay_validator, router, handlers, rate_limiter, nip11,
deletion, flash_monitor, relay_types)
- esp_littlefs: LittleFS VFS integration (git submodule)
- negentropy: for future NIP-77 binary sync (git submodule)
New source files:
- local_relay.c/h: thin wrapper for relay init/start/publish
- relay_selector.c/h: NIP-11 probe + scoring + auto-failover
- sync_manager.c/h: REQ-diff sync (primary 30min, fallback 6h)
Bug fixes:
- config.c: use-after-free (cJSON_Delete before seed_relays/sync parsing)
- local_relay: moved init to app_main for boot-time start (not gated on STA IP)
Flash layout: 4MB LittleFS partition at 0x500000 for relay_store
Test results (Board B, live hardware):
- Smoke: ping + HTTP 4869 + NIP-11: PASS
- NIP-11 info document: 10/11 PASS
- WS pub/sub (connect, REQ/EOSE, EVENT/OK, CLOSE, concurrent): 6/6 PASS
- Unit tests (relay_validator + relay_selector): 13/13 PASS
Hardware test make targets in physical-router-test-automation/:
- make relay-build, relay-flash-b, relay-test-smoke/nip11/pubsub/sync/full
Diffstat (limited to 'components/wisp_relay/sub_manager.c')
| -rw-r--r-- | components/wisp_relay/sub_manager.c | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/components/wisp_relay/sub_manager.c b/components/wisp_relay/sub_manager.c new file mode 100644 index 0000000..a1da2e3 --- /dev/null +++ b/components/wisp_relay/sub_manager.c | |||
| @@ -0,0 +1,272 @@ | |||
| 1 | #include "sub_manager.h" | ||
| 2 | #include "relay_types.h" | ||
| 3 | #include "esp_log.h" | ||
| 4 | #include <string.h> | ||
| 5 | #include <stdlib.h> | ||
| 6 | |||
| 7 | static const char *TAG = "sub_mgr"; | ||
| 8 | |||
| 9 | static void filter_clear(sub_filter_t *f) | ||
| 10 | { | ||
| 11 | for (size_t i = 0; i < f->ids_count; i++) free(f->ids[i]); | ||
| 12 | for (size_t i = 0; i < f->authors_count; i++) free(f->authors[i]); | ||
| 13 | for (size_t i = 0; i < f->e_tags_count; i++) free(f->e_tags[i]); | ||
| 14 | for (size_t i = 0; i < f->p_tags_count; i++) free(f->p_tags[i]); | ||
| 15 | memset(f, 0, sizeof(sub_filter_t)); | ||
| 16 | } | ||
| 17 | |||
| 18 | static bool filter_copy(sub_filter_t *dst, const sub_filter_t *src) | ||
| 19 | { | ||
| 20 | memset(dst, 0, sizeof(sub_filter_t)); | ||
| 21 | |||
| 22 | size_t ids_count = src->ids_count > SUB_MAX_FILTER_IDS ? SUB_MAX_FILTER_IDS : src->ids_count; | ||
| 23 | for (size_t i = 0; i < ids_count; i++) { | ||
| 24 | dst->ids[i] = strdup(src->ids[i]); | ||
| 25 | if (!dst->ids[i]) goto fail; | ||
| 26 | } | ||
| 27 | dst->ids_count = ids_count; | ||
| 28 | |||
| 29 | size_t authors_count = src->authors_count > SUB_MAX_FILTER_AUTHORS ? SUB_MAX_FILTER_AUTHORS : src->authors_count; | ||
| 30 | for (size_t i = 0; i < authors_count; i++) { | ||
| 31 | dst->authors[i] = strdup(src->authors[i]); | ||
| 32 | if (!dst->authors[i]) goto fail; | ||
| 33 | } | ||
| 34 | dst->authors_count = authors_count; | ||
| 35 | |||
| 36 | size_t kinds_count = src->kinds_count > SUB_MAX_FILTER_KINDS ? SUB_MAX_FILTER_KINDS : src->kinds_count; | ||
| 37 | memcpy(dst->kinds, src->kinds, kinds_count * sizeof(int32_t)); | ||
| 38 | dst->kinds_count = kinds_count; | ||
| 39 | |||
| 40 | size_t e_tags_count = src->e_tags_count > SUB_MAX_FILTER_ETAGS ? SUB_MAX_FILTER_ETAGS : src->e_tags_count; | ||
| 41 | for (size_t i = 0; i < e_tags_count; i++) { | ||
| 42 | dst->e_tags[i] = strdup(src->e_tags[i]); | ||
| 43 | if (!dst->e_tags[i]) goto fail; | ||
| 44 | } | ||
| 45 | dst->e_tags_count = e_tags_count; | ||
| 46 | |||
| 47 | size_t p_tags_count = src->p_tags_count > SUB_MAX_FILTER_PTAGS ? SUB_MAX_FILTER_PTAGS : src->p_tags_count; | ||
| 48 | for (size_t i = 0; i < p_tags_count; i++) { | ||
| 49 | dst->p_tags[i] = strdup(src->p_tags[i]); | ||
| 50 | if (!dst->p_tags[i]) goto fail; | ||
| 51 | } | ||
| 52 | dst->p_tags_count = p_tags_count; | ||
| 53 | |||
| 54 | dst->since = src->since; | ||
| 55 | dst->until = src->until; | ||
| 56 | dst->limit = src->limit; | ||
| 57 | return true; | ||
| 58 | |||
| 59 | fail: | ||
| 60 | filter_clear(dst); | ||
| 61 | return false; | ||
| 62 | } | ||
| 63 | |||
| 64 | static void clear_subscription(subscription_t *sub) | ||
| 65 | { | ||
| 66 | for (uint8_t i = 0; i < sub->filter_count; i++) { | ||
| 67 | filter_clear(&sub->filters[i]); | ||
| 68 | } | ||
| 69 | memset(sub, 0, sizeof(subscription_t)); | ||
| 70 | } | ||
| 71 | |||
| 72 | esp_err_t sub_manager_init(sub_manager_t *mgr) | ||
| 73 | { | ||
| 74 | memset(mgr, 0, sizeof(sub_manager_t)); | ||
| 75 | mgr->lock = xSemaphoreCreateMutex(); | ||
| 76 | if (!mgr->lock) return ESP_ERR_NO_MEM; | ||
| 77 | ESP_LOGI(TAG, "Initialized (max=%d, per_conn=%d)", SUB_MAX_TOTAL, SUB_MAX_PER_CONN); | ||
| 78 | return ESP_OK; | ||
| 79 | } | ||
| 80 | |||
| 81 | void sub_manager_destroy(sub_manager_t *mgr) | ||
| 82 | { | ||
| 83 | if (!mgr) return; | ||
| 84 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 85 | if (mgr->subs[i].active) clear_subscription(&mgr->subs[i]); | ||
| 86 | } | ||
| 87 | if (mgr->lock) { vSemaphoreDelete(mgr->lock); mgr->lock = NULL; } | ||
| 88 | } | ||
| 89 | |||
| 90 | static subscription_t *find_sub(sub_manager_t *mgr, int conn_fd, const char *sub_id) | ||
| 91 | { | ||
| 92 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 93 | if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd && | ||
| 94 | strcmp(mgr->subs[i].sub_id, sub_id) == 0) | ||
| 95 | return &mgr->subs[i]; | ||
| 96 | } | ||
| 97 | return NULL; | ||
| 98 | } | ||
| 99 | |||
| 100 | static subscription_t *find_free_slot(sub_manager_t *mgr) | ||
| 101 | { | ||
| 102 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 103 | if (!mgr->subs[i].active) return &mgr->subs[i]; | ||
| 104 | } | ||
| 105 | return NULL; | ||
| 106 | } | ||
| 107 | |||
| 108 | static bool hex_prefix_match(const char *prefix, size_t prefix_len, | ||
| 109 | const char *full, size_t full_len) | ||
| 110 | { | ||
| 111 | if (prefix_len == 0) return true; | ||
| 112 | if (prefix_len > full_len) return false; | ||
| 113 | return memcmp(prefix, full, prefix_len) == 0; | ||
| 114 | } | ||
| 115 | |||
| 116 | static bool filter_matches_event(const sub_filter_t *f, int event_kind, | ||
| 117 | const char *pubkey_hex, uint64_t created_at) | ||
| 118 | { | ||
| 119 | if (f->kinds_count > 0) { | ||
| 120 | bool found = false; | ||
| 121 | for (size_t i = 0; i < f->kinds_count; i++) { | ||
| 122 | if (f->kinds[i] == event_kind) { found = true; break; } | ||
| 123 | } | ||
| 124 | if (!found) return false; | ||
| 125 | } | ||
| 126 | |||
| 127 | if (f->authors_count > 0) { | ||
| 128 | bool found = false; | ||
| 129 | for (size_t i = 0; i < f->authors_count; i++) { | ||
| 130 | if (hex_prefix_match(f->authors[i], strlen(f->authors[i]), | ||
| 131 | pubkey_hex, strlen(pubkey_hex))) { | ||
| 132 | found = true; break; | ||
| 133 | } | ||
| 134 | } | ||
| 135 | if (!found) return false; | ||
| 136 | } | ||
| 137 | |||
| 138 | if (f->since > 0 && (int64_t)created_at < f->since) return false; | ||
| 139 | if (f->until > 0 && (int64_t)created_at > f->until) return false; | ||
| 140 | |||
| 141 | return true; | ||
| 142 | } | ||
| 143 | |||
| 144 | void sub_manager_match_json(sub_manager_t *mgr, const char *event_json, | ||
| 145 | size_t event_len, int event_kind, | ||
| 146 | const char *event_pubkey_hex, | ||
| 147 | uint64_t event_created_at, | ||
| 148 | sub_match_result_t *result) | ||
| 149 | { | ||
| 150 | result->count = 0; | ||
| 151 | (void)event_json; | ||
| 152 | (void)event_len; | ||
| 153 | |||
| 154 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 155 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 156 | subscription_t *sub = &mgr->subs[i]; | ||
| 157 | if (!sub->active) continue; | ||
| 158 | |||
| 159 | bool matched = false; | ||
| 160 | for (uint8_t f = 0; f < sub->filter_count; f++) { | ||
| 161 | if (filter_matches_event(&sub->filters[f], event_kind, | ||
| 162 | event_pubkey_hex, event_created_at)) { | ||
| 163 | matched = true; | ||
| 164 | break; | ||
| 165 | } | ||
| 166 | } | ||
| 167 | if (matched) { | ||
| 168 | sub_match_entry_t *entry = &result->matches[result->count++]; | ||
| 169 | entry->conn_fd = sub->conn_fd; | ||
| 170 | memcpy(entry->sub_id, sub->sub_id, sizeof(entry->sub_id)); | ||
| 171 | } | ||
| 172 | } | ||
| 173 | xSemaphoreGive(mgr->lock); | ||
| 174 | } | ||
| 175 | |||
| 176 | sub_error_t sub_manager_add(sub_manager_t *mgr, int conn_fd, | ||
| 177 | const char *sub_id, | ||
| 178 | const sub_filter_t *filters, | ||
| 179 | size_t filter_count) | ||
| 180 | { | ||
| 181 | if (filter_count > SUB_MAX_FILTERS) filter_count = SUB_MAX_FILTERS; | ||
| 182 | |||
| 183 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 184 | |||
| 185 | subscription_t *existing = find_sub(mgr, conn_fd, sub_id); | ||
| 186 | if (existing) { | ||
| 187 | for (uint8_t i = 0; i < existing->filter_count; i++) | ||
| 188 | filter_clear(&existing->filters[i]); | ||
| 189 | existing->events_sent = 0; | ||
| 190 | for (size_t i = 0; i < filter_count; i++) { | ||
| 191 | if (!filter_copy(&existing->filters[i], &filters[i])) { | ||
| 192 | existing->filter_count = (uint8_t)i; | ||
| 193 | xSemaphoreGive(mgr->lock); | ||
| 194 | return SUB_ERR_MEMORY; | ||
| 195 | } | ||
| 196 | } | ||
| 197 | existing->filter_count = (uint8_t)filter_count; | ||
| 198 | xSemaphoreGive(mgr->lock); | ||
| 199 | return SUB_OK; | ||
| 200 | } | ||
| 201 | |||
| 202 | uint8_t conn_count = 0; | ||
| 203 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 204 | if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) conn_count++; | ||
| 205 | } | ||
| 206 | if (conn_count >= SUB_MAX_PER_CONN) { | ||
| 207 | xSemaphoreGive(mgr->lock); | ||
| 208 | return SUB_ERR_TOO_MANY_FILTERS; | ||
| 209 | } | ||
| 210 | |||
| 211 | subscription_t *slot = find_free_slot(mgr); | ||
| 212 | if (!slot) { xSemaphoreGive(mgr->lock); return SUB_ERR_MEMORY; } | ||
| 213 | |||
| 214 | memset(slot, 0, sizeof(subscription_t)); | ||
| 215 | strncpy(slot->sub_id, sub_id, SUB_MAX_ID_LEN); | ||
| 216 | slot->sub_id[SUB_MAX_ID_LEN] = '\0'; | ||
| 217 | slot->conn_fd = conn_fd; | ||
| 218 | |||
| 219 | for (size_t i = 0; i < filter_count; i++) { | ||
| 220 | if (!filter_copy(&slot->filters[i], &filters[i])) { | ||
| 221 | slot->filter_count = (uint8_t)i; | ||
| 222 | clear_subscription(slot); | ||
| 223 | xSemaphoreGive(mgr->lock); | ||
| 224 | return SUB_ERR_MEMORY; | ||
| 225 | } | ||
| 226 | } | ||
| 227 | slot->filter_count = (uint8_t)filter_count; | ||
| 228 | slot->active = true; | ||
| 229 | mgr->active_count++; | ||
| 230 | |||
| 231 | ESP_LOGI(TAG, "Added sub=%s fd=%d filters=%zu total=%d", | ||
| 232 | sub_id, conn_fd, filter_count, mgr->active_count); | ||
| 233 | xSemaphoreGive(mgr->lock); | ||
| 234 | return SUB_OK; | ||
| 235 | } | ||
| 236 | |||
| 237 | sub_error_t sub_manager_remove(sub_manager_t *mgr, int conn_fd, const char *sub_id) | ||
| 238 | { | ||
| 239 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 240 | subscription_t *sub = find_sub(mgr, conn_fd, sub_id); | ||
| 241 | if (!sub) { xSemaphoreGive(mgr->lock); return SUB_ERR_NOT_FOUND; } | ||
| 242 | clear_subscription(sub); | ||
| 243 | mgr->active_count--; | ||
| 244 | xSemaphoreGive(mgr->lock); | ||
| 245 | return SUB_OK; | ||
| 246 | } | ||
| 247 | |||
| 248 | void sub_manager_remove_all(sub_manager_t *mgr, int conn_fd) | ||
| 249 | { | ||
| 250 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 251 | int removed = 0; | ||
| 252 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 253 | if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) { | ||
| 254 | clear_subscription(&mgr->subs[i]); | ||
| 255 | mgr->active_count--; | ||
| 256 | removed++; | ||
| 257 | } | ||
| 258 | } | ||
| 259 | if (removed > 0) ESP_LOGI(TAG, "Removed %d subs for fd=%d", removed, conn_fd); | ||
| 260 | xSemaphoreGive(mgr->lock); | ||
| 261 | } | ||
| 262 | |||
| 263 | uint8_t sub_manager_count(sub_manager_t *mgr, int conn_fd) | ||
| 264 | { | ||
| 265 | uint8_t count = 0; | ||
| 266 | xSemaphoreTake(mgr->lock, portMAX_DELAY); | ||
| 267 | for (int i = 0; i < SUB_MAX_TOTAL; i++) { | ||
| 268 | if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) count++; | ||
| 269 | } | ||
| 270 | xSemaphoreGive(mgr->lock); | ||
| 271 | return count; | ||
| 272 | } | ||