upleb.uk

Public git repos — served from a NIP-34 GRASP relay at git.upleb.uk

summaryrefslogtreecommitdiff
path: root/components/wisp_relay/ws_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'components/wisp_relay/ws_server.c')
-rw-r--r--components/wisp_relay/ws_server.c426
1 files changed, 426 insertions, 0 deletions
diff --git a/components/wisp_relay/ws_server.c b/components/wisp_relay/ws_server.c
new file mode 100644
index 0000000..a973ca6
--- /dev/null
+++ b/components/wisp_relay/ws_server.c
@@ -0,0 +1,426 @@
1#include "ws_server.h"
2#include "nip11_relay.h"
3#include "esp_log.h"
4#include "esp_timer.h"
5#include <string.h>
6#include <strings.h>
7#include <unistd.h>
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <netinet/tcp.h>
11#include <arpa/inet.h>
12
13static const char *TAG = "ws_server";
14static ws_message_cb_t g_message_callback = NULL;
15static ws_disconnect_cb_t g_disconnect_callback = NULL;
16static ws_server_t *g_server = NULL;
17static __thread httpd_req_t *g_current_req = NULL;
18
19static ws_connection_t* find_free_slot(ws_server_t *server)
20{
21 for (int i = 0; i < WS_MAX_CONNECTIONS; i++) {
22 if (!server->connections[i].active) {
23 return &server->connections[i];
24 }
25 }
26 return NULL;
27}
28
29static ws_connection_t* find_connection_by_fd(ws_server_t *server, int fd)
30{
31 for (int i = 0; i < WS_MAX_CONNECTIONS; i++) {
32 if (server->connections[i].active && server->connections[i].fd == fd) {
33 return &server->connections[i];
34 }
35 }
36 return NULL;
37}
38
39static void update_connection_activity(ws_server_t *server, int fd)
40{
41 xSemaphoreTake(server->lock, portMAX_DELAY);
42 ws_connection_t *conn = find_connection_by_fd(server, fd);
43 if (conn) {
44 conn->last_activity = esp_timer_get_time() / 1000000;
45 }
46 xSemaphoreGive(server->lock);
47}
48
49static void set_unknown_ip(char *ip_buf, size_t buf_len)
50{
51 if (buf_len == 0) {
52 return;
53 }
54 strncpy(ip_buf, "unknown", buf_len - 1);
55 ip_buf[buf_len - 1] = '\0';
56}
57
58static void get_client_ip(int fd, char *ip_buf, size_t buf_len)
59{
60 if (buf_len == 0) {
61 return;
62 }
63
64 struct sockaddr_storage addr;
65 socklen_t addr_len = sizeof(addr);
66
67 if (getpeername(fd, (struct sockaddr *)&addr, &addr_len) != 0) {
68 set_unknown_ip(ip_buf, buf_len);
69 return;
70 }
71
72 const char *result = NULL;
73 if (addr.ss_family == AF_INET) {
74 struct sockaddr_in *addr_in = (struct sockaddr_in *)&addr;
75 result = inet_ntop(AF_INET, &addr_in->sin_addr, ip_buf, buf_len);
76 }
77 if (!result) {
78 set_unknown_ip(ip_buf, buf_len);
79 }
80}
81
82static esp_err_t on_open(httpd_handle_t hd, int sockfd)
83{
84 if (!g_server) return ESP_FAIL;
85
86 xSemaphoreTake(g_server->lock, portMAX_DELAY);
87
88 if (g_server->connection_count >= WS_MAX_CONNECTIONS) {
89 xSemaphoreGive(g_server->lock);
90 ESP_LOGW(TAG, "Connection rejected - max connections reached");
91 return ESP_FAIL;
92 }
93
94 ws_connection_t *conn = find_free_slot(g_server);
95 if (!conn) {
96 xSemaphoreGive(g_server->lock);
97 ESP_LOGE(TAG, "No free slot despite connection_count < WS_MAX_CONNECTIONS (fd=%d)", sockfd);
98 return ESP_FAIL;
99 }
100
101 struct linger so_linger = { .l_onoff = 1, .l_linger = 0 };
102 setsockopt(sockfd, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
103
104 int nodelay = 1;
105 setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
106
107 conn->fd = sockfd;
108 conn->active = true;
109 conn->connected_at = esp_timer_get_time() / 1000000;
110 conn->last_activity = conn->connected_at;
111 get_client_ip(sockfd, conn->remote_ip, sizeof(conn->remote_ip));
112 g_server->connection_count++;
113 ESP_LOGI(TAG, "New connection from %s (fd=%d, total=%d)",
114 conn->remote_ip, sockfd, g_server->connection_count);
115
116 xSemaphoreGive(g_server->lock);
117 return ESP_OK;
118}
119
120static void on_close(httpd_handle_t hd, int sockfd)
121{
122 if (!g_server) return;
123
124 if (g_disconnect_callback) {
125 g_disconnect_callback(sockfd);
126 }
127
128 xSemaphoreTake(g_server->lock, portMAX_DELAY);
129
130 ws_connection_t *conn = find_connection_by_fd(g_server, sockfd);
131 if (conn) {
132 ESP_LOGI(TAG, "Connection closed (fd=%d, ip=%s)", sockfd, conn->remote_ip);
133 memset(conn, 0, sizeof(ws_connection_t));
134 g_server->connection_count--;
135 }
136
137 xSemaphoreGive(g_server->lock);
138}
139
140void ws_server_set_disconnect_cb(ws_disconnect_cb_t cb)
141{
142 g_disconnect_callback = cb;
143}
144
145static esp_err_t ws_handler(httpd_req_t *req)
146{
147 if (req->method == HTTP_GET) {
148 char upgrade[16] = {0};
149 if (httpd_req_get_hdr_value_str(req, "Upgrade", upgrade, sizeof(upgrade)) != ESP_OK ||
150 strcasecmp(upgrade, "websocket") != 0) {
151 return relay_nip11_handler(req);
152 }
153 ESP_LOGD(TAG, "WebSocket handshake completed");
154 return ESP_OK;
155 }
156
157 httpd_ws_frame_t ws_pkt;
158 memset(&ws_pkt, 0, sizeof(httpd_ws_frame_t));
159 ws_pkt.type = HTTPD_WS_TYPE_TEXT;
160
161 esp_err_t ret = httpd_ws_recv_frame(req, &ws_pkt, 0);
162 if (ret != ESP_OK) {
163 ESP_LOGE(TAG, "Failed to get frame len: %d", ret);
164 return ret;
165 }
166
167 if (ws_pkt.len == 0) {
168 return ESP_OK;
169 }
170
171 if (ws_pkt.len > WS_MAX_FRAME_SIZE) {
172 ESP_LOGW(TAG, "Frame too large: %zu bytes", ws_pkt.len);
173 return ESP_FAIL;
174 }
175
176 ws_pkt.payload = malloc(ws_pkt.len + 1);
177 if (!ws_pkt.payload) {
178 ESP_LOGE(TAG, "Failed to allocate %zu bytes", ws_pkt.len);
179 return ESP_ERR_NO_MEM;
180 }
181
182 ret = httpd_ws_recv_frame(req, &ws_pkt, ws_pkt.len);
183 if (ret != ESP_OK) {
184 ESP_LOGE(TAG, "Failed to receive frame: %d", ret);
185 free(ws_pkt.payload);
186 return ret;
187 }
188
189 ((char *)ws_pkt.payload)[ws_pkt.len] = '\0';
190
191 int fd = httpd_req_to_sockfd(req);
192 if (g_server) {
193 update_connection_activity(g_server, fd);
194 }
195
196 switch (ws_pkt.type) {
197 case HTTPD_WS_TYPE_TEXT:
198 ESP_LOGD(TAG, "Received %zu bytes from fd=%d", ws_pkt.len, fd);
199 if (g_message_callback) {
200 g_current_req = req;
201 g_message_callback(fd, (char *)ws_pkt.payload, ws_pkt.len);
202 g_current_req = NULL;
203 }
204 break;
205
206 case HTTPD_WS_TYPE_PING:
207 ws_pkt.type = HTTPD_WS_TYPE_PONG;
208 ret = httpd_ws_send_frame(req, &ws_pkt);
209 if (ret != ESP_OK) {
210 ESP_LOGW(TAG, "Failed to send PONG to fd=%d: %d", fd, ret);
211 free(ws_pkt.payload);
212 return ret;
213 }
214 break;
215
216 case HTTPD_WS_TYPE_CLOSE: {
217 ESP_LOGD(TAG, "Received CLOSE frame from fd=%d", fd);
218 free(ws_pkt.payload);
219 httpd_ws_frame_t close_pkt = {
220 .type = HTTPD_WS_TYPE_CLOSE,
221 .payload = NULL,
222 .len = 0,
223 };
224 httpd_ws_send_frame(req, &close_pkt);
225 return ESP_FAIL;
226 }
227
228 default:
229 break;
230 }
231
232 free(ws_pkt.payload);
233 return ESP_OK;
234}
235
236typedef struct {
237 httpd_handle_t hd;
238 int fd;
239 char *data;
240 size_t len;
241} async_send_arg_t;
242
243static void ws_async_send(void *arg)
244{
245 async_send_arg_t *a = (async_send_arg_t *)arg;
246
247 httpd_ws_frame_t ws_pkt = {
248 .type = HTTPD_WS_TYPE_TEXT,
249 .payload = (uint8_t *)a->data,
250 .len = a->len,
251 };
252
253 esp_err_t ret = httpd_ws_send_frame_async(a->hd, a->fd, &ws_pkt);
254 if (ret != ESP_OK) {
255 ESP_LOGW(TAG, "Async send failed to fd=%d: %d", a->fd, ret);
256 }
257
258 free(a->data);
259 free(a);
260}
261
262static void cleanup_server_init(ws_server_t *server, bool stop_httpd)
263{
264 g_server = NULL;
265 g_message_callback = NULL;
266 if (stop_httpd && server->server) {
267 httpd_stop(server->server);
268 server->server = NULL;
269 }
270 if (server->lock) {
271 vSemaphoreDelete(server->lock);
272 server->lock = NULL;
273 }
274}
275
276esp_err_t ws_server_init(ws_server_t *server, uint16_t port, ws_message_cb_t on_message)
277{
278 if (server->server != NULL) {
279 ESP_LOGE(TAG, "Server already initialized, call ws_server_stop first");
280 return ESP_ERR_INVALID_STATE;
281 }
282
283 memset(server, 0, sizeof(ws_server_t));
284 server->lock = xSemaphoreCreateMutex();
285 if (!server->lock) {
286 return ESP_ERR_NO_MEM;
287 }
288
289 g_server = server;
290 g_message_callback = on_message;
291
292 httpd_config_t config = HTTPD_DEFAULT_CONFIG();
293 config.server_port = port;
294 config.ctrl_port = port + 1;
295 config.max_open_sockets = WS_MAX_CONNECTIONS;
296 config.backlog_conn = WS_MAX_CONNECTIONS;
297 config.lru_purge_enable = true;
298 config.recv_wait_timeout = 3;
299 config.send_wait_timeout = 3;
300 config.keep_alive_enable = true;
301 config.keep_alive_idle = 5;
302 config.keep_alive_interval = 1;
303 config.keep_alive_count = 3;
304 config.stack_size = 12288;
305 config.open_fn = on_open;
306 config.close_fn = on_close;
307
308 esp_err_t ret = httpd_start(&server->server, &config);
309 if (ret != ESP_OK) {
310 ESP_LOGE(TAG, "Failed to start server: %d", ret);
311 cleanup_server_init(server, false);
312 return ret;
313 }
314
315 httpd_uri_t ws_uri = {
316 .uri = "/",
317 .method = HTTP_GET,
318 .handler = ws_handler,
319 .user_ctx = NULL,
320 .is_websocket = true,
321 .handle_ws_control_frames = true,
322 };
323
324 ret = httpd_register_uri_handler(server->server, &ws_uri);
325 if (ret != ESP_OK) {
326 ESP_LOGE(TAG, "Failed to register WS handler: %d", ret);
327 cleanup_server_init(server, true);
328 return ret;
329 }
330
331 httpd_uri_t options_uri = {
332 .uri = "/",
333 .method = HTTP_OPTIONS,
334 .handler = relay_nip11_options_handler,
335 .user_ctx = NULL,
336 };
337
338 ret = httpd_register_uri_handler(server->server, &options_uri);
339 if (ret != ESP_OK) {
340 ESP_LOGE(TAG, "Failed to register OPTIONS handler: %d", ret);
341 }
342
343 ESP_LOGI(TAG, "WebSocket server started on port %d", port);
344 return ESP_OK;
345}
346
347void ws_server_stop(ws_server_t *server)
348{
349 g_server = NULL;
350 g_message_callback = NULL;
351 g_disconnect_callback = NULL;
352
353 if (server->server) {
354 httpd_stop(server->server);
355 server->server = NULL;
356 }
357 if (server->lock) {
358 vSemaphoreDelete(server->lock);
359 server->lock = NULL;
360 }
361 memset(server->connections, 0, sizeof(server->connections));
362 server->connection_count = 0;
363}
364
365bool ws_server_is_running(ws_server_t *server)
366{
367 return server && server->server != NULL;
368}
369
370esp_err_t ws_server_send(ws_server_t *server, int fd, const char *data, size_t len)
371{
372 if (!server->server) return ESP_ERR_INVALID_STATE;
373
374 if (g_current_req && httpd_req_to_sockfd(g_current_req) == fd) {
375 httpd_ws_frame_t ws_pkt = {
376 .type = HTTPD_WS_TYPE_TEXT,
377 .payload = (uint8_t *)data,
378 .len = len,
379 };
380 return httpd_ws_send_frame(g_current_req, &ws_pkt);
381 }
382
383 async_send_arg_t *arg = malloc(sizeof(async_send_arg_t));
384 if (!arg) return ESP_ERR_NO_MEM;
385
386 arg->data = malloc(len);
387 if (!arg->data) {
388 free(arg);
389 return ESP_ERR_NO_MEM;
390 }
391
392 memcpy(arg->data, data, len);
393 arg->hd = server->server;
394 arg->fd = fd;
395 arg->len = len;
396
397 esp_err_t ret = httpd_queue_work(server->server, ws_async_send, arg);
398 if (ret != ESP_OK) {
399 free(arg->data);
400 free(arg);
401 return ret;
402 }
403 return ESP_OK;
404}
405
406esp_err_t ws_server_broadcast(ws_server_t *server, const char *data, size_t len)
407{
408 xSemaphoreTake(server->lock, portMAX_DELAY);
409
410 for (int i = 0; i < WS_MAX_CONNECTIONS; i++) {
411 if (server->connections[i].active) {
412 ws_server_send(server, server->connections[i].fd, data, len);
413 }
414 }
415
416 xSemaphoreGive(server->lock);
417 return ESP_OK;
418}
419
420void ws_server_close_connection(ws_server_t *server, int fd)
421{
422 if (!server || !server->server) {
423 return;
424 }
425 httpd_sess_trigger_close(server->server, fd);
426}