upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/components/wisp_relay/sub_manager.c
diff options
context:
space:
mode:
authorYour Name <you@example.com>2026-05-19 02:31:19 +0530
committerYour Name <you@example.com>2026-05-19 02:32:41 +0530
commit81f2dc52dc42d01c89dff45a5407ec40b8863052 (patch)
tree15018c2438639ca89dc6d33a5144c10d0b1c2af0 /components/wisp_relay/sub_manager.c
parent75688d55b3c8d13c8c9a50da9668ec408f684cb3 (diff)
feat: local Nostr relay with relay selection, sync, and integration tests
Local Nostr relay (NIP-01) on port 4869 with LittleFS 4MB storage. All events published locally first, then synced to public relays via REQ-diff. Relay selection via NIP-11 HTTP probing with NIP-77 scoring and auto-failover. Components: - wisp_relay: 16-file local relay (ws_server, storage_engine, sub_manager, broadcaster, relay_validator, router, handlers, rate_limiter, nip11, deletion, flash_monitor, relay_types) - esp_littlefs: LittleFS VFS integration (git submodule) - negentropy: for future NIP-77 binary sync (git submodule) New source files: - local_relay.c/h: thin wrapper for relay init/start/publish - relay_selector.c/h: NIP-11 probe + scoring + auto-failover - sync_manager.c/h: REQ-diff sync (primary 30min, fallback 6h) Bug fixes: - config.c: use-after-free (cJSON_Delete before seed_relays/sync parsing) - local_relay: moved init to app_main for boot-time start (not gated on STA IP) Flash layout: 4MB LittleFS partition at 0x500000 for relay_store Test results (Board B, live hardware): - Smoke: ping + HTTP 4869 + NIP-11: PASS - NIP-11 info document: 10/11 PASS - WS pub/sub (connect, REQ/EOSE, EVENT/OK, CLOSE, concurrent): 6/6 PASS - Unit tests (relay_validator + relay_selector): 13/13 PASS Hardware test make targets in physical-router-test-automation/: - make relay-build, relay-flash-b, relay-test-smoke/nip11/pubsub/sync/full
Diffstat (limited to 'components/wisp_relay/sub_manager.c')
-rw-r--r--components/wisp_relay/sub_manager.c272
1 files changed, 272 insertions, 0 deletions
diff --git a/components/wisp_relay/sub_manager.c b/components/wisp_relay/sub_manager.c
new file mode 100644
index 0000000..a1da2e3
--- /dev/null
+++ b/components/wisp_relay/sub_manager.c
@@ -0,0 +1,272 @@
1#include "sub_manager.h"
2#include "relay_types.h"
3#include "esp_log.h"
4#include <string.h>
5#include <stdlib.h>
6
7static const char *TAG = "sub_mgr";
8
9static void filter_clear(sub_filter_t *f)
10{
11 for (size_t i = 0; i < f->ids_count; i++) free(f->ids[i]);
12 for (size_t i = 0; i < f->authors_count; i++) free(f->authors[i]);
13 for (size_t i = 0; i < f->e_tags_count; i++) free(f->e_tags[i]);
14 for (size_t i = 0; i < f->p_tags_count; i++) free(f->p_tags[i]);
15 memset(f, 0, sizeof(sub_filter_t));
16}
17
18static bool filter_copy(sub_filter_t *dst, const sub_filter_t *src)
19{
20 memset(dst, 0, sizeof(sub_filter_t));
21
22 size_t ids_count = src->ids_count > SUB_MAX_FILTER_IDS ? SUB_MAX_FILTER_IDS : src->ids_count;
23 for (size_t i = 0; i < ids_count; i++) {
24 dst->ids[i] = strdup(src->ids[i]);
25 if (!dst->ids[i]) goto fail;
26 }
27 dst->ids_count = ids_count;
28
29 size_t authors_count = src->authors_count > SUB_MAX_FILTER_AUTHORS ? SUB_MAX_FILTER_AUTHORS : src->authors_count;
30 for (size_t i = 0; i < authors_count; i++) {
31 dst->authors[i] = strdup(src->authors[i]);
32 if (!dst->authors[i]) goto fail;
33 }
34 dst->authors_count = authors_count;
35
36 size_t kinds_count = src->kinds_count > SUB_MAX_FILTER_KINDS ? SUB_MAX_FILTER_KINDS : src->kinds_count;
37 memcpy(dst->kinds, src->kinds, kinds_count * sizeof(int32_t));
38 dst->kinds_count = kinds_count;
39
40 size_t e_tags_count = src->e_tags_count > SUB_MAX_FILTER_ETAGS ? SUB_MAX_FILTER_ETAGS : src->e_tags_count;
41 for (size_t i = 0; i < e_tags_count; i++) {
42 dst->e_tags[i] = strdup(src->e_tags[i]);
43 if (!dst->e_tags[i]) goto fail;
44 }
45 dst->e_tags_count = e_tags_count;
46
47 size_t p_tags_count = src->p_tags_count > SUB_MAX_FILTER_PTAGS ? SUB_MAX_FILTER_PTAGS : src->p_tags_count;
48 for (size_t i = 0; i < p_tags_count; i++) {
49 dst->p_tags[i] = strdup(src->p_tags[i]);
50 if (!dst->p_tags[i]) goto fail;
51 }
52 dst->p_tags_count = p_tags_count;
53
54 dst->since = src->since;
55 dst->until = src->until;
56 dst->limit = src->limit;
57 return true;
58
59fail:
60 filter_clear(dst);
61 return false;
62}
63
64static void clear_subscription(subscription_t *sub)
65{
66 for (uint8_t i = 0; i < sub->filter_count; i++) {
67 filter_clear(&sub->filters[i]);
68 }
69 memset(sub, 0, sizeof(subscription_t));
70}
71
72esp_err_t sub_manager_init(sub_manager_t *mgr)
73{
74 memset(mgr, 0, sizeof(sub_manager_t));
75 mgr->lock = xSemaphoreCreateMutex();
76 if (!mgr->lock) return ESP_ERR_NO_MEM;
77 ESP_LOGI(TAG, "Initialized (max=%d, per_conn=%d)", SUB_MAX_TOTAL, SUB_MAX_PER_CONN);
78 return ESP_OK;
79}
80
81void sub_manager_destroy(sub_manager_t *mgr)
82{
83 if (!mgr) return;
84 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
85 if (mgr->subs[i].active) clear_subscription(&mgr->subs[i]);
86 }
87 if (mgr->lock) { vSemaphoreDelete(mgr->lock); mgr->lock = NULL; }
88}
89
90static subscription_t *find_sub(sub_manager_t *mgr, int conn_fd, const char *sub_id)
91{
92 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
93 if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd &&
94 strcmp(mgr->subs[i].sub_id, sub_id) == 0)
95 return &mgr->subs[i];
96 }
97 return NULL;
98}
99
100static subscription_t *find_free_slot(sub_manager_t *mgr)
101{
102 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
103 if (!mgr->subs[i].active) return &mgr->subs[i];
104 }
105 return NULL;
106}
107
108static bool hex_prefix_match(const char *prefix, size_t prefix_len,
109 const char *full, size_t full_len)
110{
111 if (prefix_len == 0) return true;
112 if (prefix_len > full_len) return false;
113 return memcmp(prefix, full, prefix_len) == 0;
114}
115
116static bool filter_matches_event(const sub_filter_t *f, int event_kind,
117 const char *pubkey_hex, uint64_t created_at)
118{
119 if (f->kinds_count > 0) {
120 bool found = false;
121 for (size_t i = 0; i < f->kinds_count; i++) {
122 if (f->kinds[i] == event_kind) { found = true; break; }
123 }
124 if (!found) return false;
125 }
126
127 if (f->authors_count > 0) {
128 bool found = false;
129 for (size_t i = 0; i < f->authors_count; i++) {
130 if (hex_prefix_match(f->authors[i], strlen(f->authors[i]),
131 pubkey_hex, strlen(pubkey_hex))) {
132 found = true; break;
133 }
134 }
135 if (!found) return false;
136 }
137
138 if (f->since > 0 && (int64_t)created_at < f->since) return false;
139 if (f->until > 0 && (int64_t)created_at > f->until) return false;
140
141 return true;
142}
143
144void sub_manager_match_json(sub_manager_t *mgr, const char *event_json,
145 size_t event_len, int event_kind,
146 const char *event_pubkey_hex,
147 uint64_t event_created_at,
148 sub_match_result_t *result)
149{
150 result->count = 0;
151 (void)event_json;
152 (void)event_len;
153
154 xSemaphoreTake(mgr->lock, portMAX_DELAY);
155 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
156 subscription_t *sub = &mgr->subs[i];
157 if (!sub->active) continue;
158
159 bool matched = false;
160 for (uint8_t f = 0; f < sub->filter_count; f++) {
161 if (filter_matches_event(&sub->filters[f], event_kind,
162 event_pubkey_hex, event_created_at)) {
163 matched = true;
164 break;
165 }
166 }
167 if (matched) {
168 sub_match_entry_t *entry = &result->matches[result->count++];
169 entry->conn_fd = sub->conn_fd;
170 memcpy(entry->sub_id, sub->sub_id, sizeof(entry->sub_id));
171 }
172 }
173 xSemaphoreGive(mgr->lock);
174}
175
176sub_error_t sub_manager_add(sub_manager_t *mgr, int conn_fd,
177 const char *sub_id,
178 const sub_filter_t *filters,
179 size_t filter_count)
180{
181 if (filter_count > SUB_MAX_FILTERS) filter_count = SUB_MAX_FILTERS;
182
183 xSemaphoreTake(mgr->lock, portMAX_DELAY);
184
185 subscription_t *existing = find_sub(mgr, conn_fd, sub_id);
186 if (existing) {
187 for (uint8_t i = 0; i < existing->filter_count; i++)
188 filter_clear(&existing->filters[i]);
189 existing->events_sent = 0;
190 for (size_t i = 0; i < filter_count; i++) {
191 if (!filter_copy(&existing->filters[i], &filters[i])) {
192 existing->filter_count = (uint8_t)i;
193 xSemaphoreGive(mgr->lock);
194 return SUB_ERR_MEMORY;
195 }
196 }
197 existing->filter_count = (uint8_t)filter_count;
198 xSemaphoreGive(mgr->lock);
199 return SUB_OK;
200 }
201
202 uint8_t conn_count = 0;
203 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
204 if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) conn_count++;
205 }
206 if (conn_count >= SUB_MAX_PER_CONN) {
207 xSemaphoreGive(mgr->lock);
208 return SUB_ERR_TOO_MANY_FILTERS;
209 }
210
211 subscription_t *slot = find_free_slot(mgr);
212 if (!slot) { xSemaphoreGive(mgr->lock); return SUB_ERR_MEMORY; }
213
214 memset(slot, 0, sizeof(subscription_t));
215 strncpy(slot->sub_id, sub_id, SUB_MAX_ID_LEN);
216 slot->sub_id[SUB_MAX_ID_LEN] = '\0';
217 slot->conn_fd = conn_fd;
218
219 for (size_t i = 0; i < filter_count; i++) {
220 if (!filter_copy(&slot->filters[i], &filters[i])) {
221 slot->filter_count = (uint8_t)i;
222 clear_subscription(slot);
223 xSemaphoreGive(mgr->lock);
224 return SUB_ERR_MEMORY;
225 }
226 }
227 slot->filter_count = (uint8_t)filter_count;
228 slot->active = true;
229 mgr->active_count++;
230
231 ESP_LOGI(TAG, "Added sub=%s fd=%d filters=%zu total=%d",
232 sub_id, conn_fd, filter_count, mgr->active_count);
233 xSemaphoreGive(mgr->lock);
234 return SUB_OK;
235}
236
237sub_error_t sub_manager_remove(sub_manager_t *mgr, int conn_fd, const char *sub_id)
238{
239 xSemaphoreTake(mgr->lock, portMAX_DELAY);
240 subscription_t *sub = find_sub(mgr, conn_fd, sub_id);
241 if (!sub) { xSemaphoreGive(mgr->lock); return SUB_ERR_NOT_FOUND; }
242 clear_subscription(sub);
243 mgr->active_count--;
244 xSemaphoreGive(mgr->lock);
245 return SUB_OK;
246}
247
248void sub_manager_remove_all(sub_manager_t *mgr, int conn_fd)
249{
250 xSemaphoreTake(mgr->lock, portMAX_DELAY);
251 int removed = 0;
252 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
253 if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) {
254 clear_subscription(&mgr->subs[i]);
255 mgr->active_count--;
256 removed++;
257 }
258 }
259 if (removed > 0) ESP_LOGI(TAG, "Removed %d subs for fd=%d", removed, conn_fd);
260 xSemaphoreGive(mgr->lock);
261}
262
263uint8_t sub_manager_count(sub_manager_t *mgr, int conn_fd)
264{
265 uint8_t count = 0;
266 xSemaphoreTake(mgr->lock, portMAX_DELAY);
267 for (int i = 0; i < SUB_MAX_TOTAL; i++) {
268 if (mgr->subs[i].active && mgr->subs[i].conn_fd == conn_fd) count++;
269 }
270 xSemaphoreGive(mgr->lock);
271 return count;
272}