1 | #if defined(__linux__) || defined(__bsd__) || defined(__sunos__) |
2 | #include <unistd.h> |
3 | #endif |
4 | |
5 | #include <stdbool.h> |
6 | #include <stdio.h> |
7 | #include <stdlib.h> |
8 | #include <string.h> |
9 | |
10 | #include "server.h" |
11 | #include "websocket.h" |
12 | |
13 | //------------------------------------------------------------------------------ |
14 | // Internal functions |
15 | //------------------------------------------------------------------------------ |
16 | |
17 | /** |
18 | * @defgroup AddressPool |
19 | */ |
20 | |
21 | address_pool* address_pool_new(int initial_size, int growth_factor) |
22 | { |
23 | address_pool* pool = (address_pool*)malloc(sizeof(address_pool)); |
24 | pool->slots = (uintptr_t*)calloc(initial_size, sizeof(uintptr_t)); |
25 | pool->capacity = initial_size; |
26 | pool->count = 0; |
27 | pool->last_used_index = 0; |
28 | pool->growth_factor = growth_factor; |
29 | |
30 | return pool; |
31 | } |
32 | |
33 | void address_pool_free(address_pool** pool) |
34 | { |
35 | if (*pool != NULL) |
36 | { |
37 | free((*pool)->slots); |
38 | free(*pool); |
39 | (*pool) = NULL; |
40 | } |
41 | } |
42 | |
43 | void address_pool_resize(address_pool *pool) |
44 | { |
45 | int new_capacity = pool->capacity * pool->growth_factor; |
46 | uintptr_t* new_slots = (uintptr_t *)calloc(new_capacity, sizeof(uintptr_t)); |
47 | memcpy(new_slots, pool->slots, pool->capacity * sizeof(uintptr_t)); |
48 | |
49 | free(pool->slots); |
50 | |
51 | pool->slots = new_slots; |
52 | pool->capacity = new_capacity; |
53 | } |
54 | |
55 | uint32_t address_pool_set(address_pool* pool, uintptr_t address) |
56 | { |
57 | if (pool->count == pool->capacity) |
58 | { |
59 | address_pool_resize(pool); |
60 | } |
61 | |
62 | while (pool->slots[pool->last_used_index] != 0) |
63 | { |
64 | pool->last_used_index = (pool->last_used_index + 1) % pool->capacity; |
65 | } |
66 | |
67 | // Placeholder for actual address or ID |
68 | pool->slots[pool->last_used_index] = address; |
69 | pool->count++; |
70 | |
71 | uint32_t allocated_index = pool->last_used_index; |
72 | pool->last_used_index = (pool->last_used_index + 1) % pool->capacity; |
73 | |
74 | return allocated_index; |
75 | } |
76 | |
77 | uintptr_t address_pool_get(address_pool* pool, vws_cid_t index) |
78 | { |
79 | if (index >= pool->capacity || pool->slots[index] == 0) |
80 | { |
81 | return 0; |
82 | } |
83 | |
84 | return pool->slots[index]; |
85 | } |
86 | |
87 | void address_pool_remove(address_pool* pool, uint32_t index) |
88 | { |
89 | if (pool->slots[index] != 0) |
90 | { |
91 | pool->slots[index] = 0; |
92 | pool->count--; |
93 | } |
94 | } |
95 | |
96 | /** |
97 | * @defgroup ThreadFunctions |
98 | * |
99 | * @brief Program thread organization |
100 | * |
101 | * There are two thread functions: the network thread (main thread) and work |
102 | * pool threads. |
103 | */ |
104 | |
105 | /** |
106 | * @brief UV callback executed in response to (server->wakeup) async signal. |
107 | * |
108 | * This function handles asynchronous events in the main thread, specifically |
109 | * managing outgoing network I/O from worker threads back to clients. It runs |
110 | * within the main UV loop, within the main thread, also referred to as the |
111 | * networking thread. |
112 | * |
113 | * Worker threads pass data to it through a queue. The data is in the form of |
114 | * vws_svr_data instances. When a worker sends a vws_svr_data instance, it |
115 | * adds it to the queue (server->responses) and notifies (wakes up) the main UV |
116 | * loop (uv_run() in vrtql_tcp_svr_run()) by calling uv_async_send(server->wakeup) |
117 | * which in turn calls this function to check the server->responses queue. This |
118 | * function unloads all data in the queue and sends the data out to each |
119 | * respective client. It then returns control back to the main UV loop which |
120 | * resumes polling the network connections (blocking if there is no activity). |
121 | * |
122 | * @param handle A pointer to the uv_async_t handle that triggered the callback. |
123 | * |
124 | * @ingroup ThreadFunctions |
125 | */ |
126 | static void uv_thread(uv_async_t* handle); |
127 | |
128 | /** |
129 | * @brief The entry point for a worker thread. |
130 | * |
131 | * This function implements the worker thread pool. It is what each worker |
132 | * thread runs. It loops continuously, handling incoming data from clients, |
133 | * processing them and returning data back to them via the uv_thread(). It |
134 | * processes data by taking data (requests) from the server->requests queue, |
135 | * dispatching them to server->process(data) for processing, which in turn |
136 | * generates data (responses), sending them back to the client by putting them |
137 | * on the server->responses queue (processed by uv_thread()). |
138 | * |
139 | * @param arg A void pointer, typically to the server object, |
140 | * used to pass data into the thread. |
141 | * |
142 | * @ingroup ThreadFunctions |
143 | */ |
144 | static void worker_thread(void* arg); |
145 | |
146 | /** |
147 | * @defgroup ServerFunctions |
148 | * |
149 | * @brief Functions that support server operation |
150 | * |
151 | */ |
152 | |
153 | /** |
154 | * @brief Server instance constructor |
155 | * |
156 | * Constructs a new server instance. This takes a new, empty vws_tcp_svr instance |
157 | * and initializes all of its members. It is used by derived structs as well |
158 | * (vrtql_msg_svr) to construct the base struct. |
159 | * |
160 | * @param server The server instance to be initialized |
161 | * @return The initialized server instance |
162 | * |
163 | * @ingroup ServerFunctions |
164 | */ |
165 | |
166 | static vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* s, int nt, int bl, int qs); |
167 | |
168 | /** |
169 | * @brief Server instance destructor |
170 | * |
171 | * Destructs an initialized server instance. This takes a vws_tcp_svr instance |
172 | * and deallocates all of its members -- everything but the top-level |
173 | * struct. This is used by derived structs as well (vrtql_msg_svr) to destruct |
174 | * the base struct. |
175 | * |
176 | * @param server The server instance to be destructed |
177 | * |
178 | * @ingroup ServerFunctions |
179 | */ |
180 | |
181 | static void tcp_svr_dtor(vws_tcp_svr* s); |
182 | |
183 | /** |
184 | * @brief Initiates the server shutdown process. |
185 | * |
186 | * This function is responsible for shutting down a vws_tcp_svr server instance. |
187 | * It stops the server if it's currently running, performs necessary cleanup, |
188 | * shuts down the libuv event loop, frees memory, and clears the connection map. |
189 | * It signals all worker threads to stop processing new data and to finish any |
190 | * requests they are currently processing. |
191 | * |
192 | * @param server The server that needs to be shutdown. |
193 | * |
194 | * @ingroup ServerFunctions |
195 | */ |
196 | static void svr_shutdown(vws_tcp_svr* server); |
197 | |
198 | /** |
199 | * @brief Handles the close event for a libuv handle. |
200 | * |
201 | * This function is called when a libuv handle is closed. It checks if the handle |
202 | * has associated heap data stored in `handle->data` and generates a warning if |
203 | * the resource was not properly freed. |
204 | * |
205 | * @param handle The libuv handle being closed. |
206 | */ |
207 | static void on_uv_close(uv_handle_t* handle); |
208 | |
209 | /** |
210 | * @brief Walks through libuv handles and attempts to close them. |
211 | * |
212 | * This function is used to walk through libuv handles and attempts to close each |
213 | * handle. It is typically called during libuv shutdown to ensure that all handles |
214 | * are properly closed. If a handle has not been closed, it will be closed and |
215 | * the `on_uv_close()` function will be called. |
216 | * |
217 | * @param handle The libuv handle being walked. |
218 | * @param arg Optional argument passed to the function (not used in this case). |
219 | */ |
220 | static void on_uv_walk(uv_handle_t* handle, void* arg); |
221 | |
222 | /** |
223 | * @brief Callback for new client connection. |
224 | * |
225 | * This function is invoked when a new client successfully connects to the |
226 | * server. |
227 | * |
228 | * @param server The server to which the client connected. |
229 | * @param status The status of the connection. |
230 | * |
231 | * @ingroup ServerFunctions |
232 | */ |
233 | static void svr_on_connect(uv_stream_t* server, int status); |
234 | |
235 | /** |
236 | * @brief Callback for buffer reallocation. |
237 | * |
238 | * This function is invoked when a handle requires a buffer reallocation. |
239 | * |
240 | * @param handle The handle requiring reallocation. |
241 | * @param size The size of the buffer to be allocated. |
242 | * @param buf The buffer. |
243 | * |
244 | * @ingroup ServerFunctions |
245 | */ |
246 | static void svr_on_realloc(uv_handle_t* handle, size_t size, uv_buf_t* buf); |
247 | |
248 | /** |
249 | * @brief Callback for handle closure. |
250 | * |
251 | * This function is invoked when a handle is closed. |
252 | * |
253 | * @param handle The handle that was closed. |
254 | * |
255 | * @ingroup ServerFunctions |
256 | |
257 | */ |
258 | static void svr_on_close(uv_handle_t* handle); |
259 | |
260 | /** |
261 | * @brief Callback for reading data. |
262 | * |
263 | * This function is invoked when data is read from a client. |
264 | * |
265 | * @param client The client from which data was read. |
266 | * @param size The number of bytes read. |
267 | * @param buf The buffer containing the data. |
268 | * |
269 | * @ingroup ServerFunctions |
270 | */ |
271 | static void svr_on_read(uv_stream_t* client, ssize_t size, const uv_buf_t* buf); |
272 | |
273 | /** |
274 | * @brief Callback for write completion. |
275 | * |
276 | * This function is invoked when a write operation to a client completes. |
277 | * |
278 | * @param req The write request. |
279 | * @param status The status of the write operation. |
280 | * |
281 | * @ingroup ServerFunctions |
282 | */ |
283 | static void svr_on_write_complete(uv_write_t* req, int status); |
284 | |
285 | /** |
286 | * @defgroup Connection Functions |
287 | * |
288 | * @brief Functions that support connection operation |
289 | * |
290 | * Connections always refer to client-side connections, on the server. They are |
291 | * active connections established within the main UV loop. |
292 | * |
293 | * @ingroup ServerFunctions |
294 | */ |
295 | |
296 | /** |
297 | * @brief Creates a new server connection. |
298 | * |
299 | * @param s The server for the connection. |
300 | * @param c The client for the connection. |
301 | * @return A new server connection. |
302 | * |
303 | * @ingroup ServerFunctions |
304 | */ |
305 | static vws_svr_cnx* svr_cnx_new(vws_tcp_svr* s, uv_stream_t* c); |
306 | |
307 | /** |
308 | * @brief Frees a server connection. |
309 | * |
310 | * @param c The connection to be freed. |
311 | * |
312 | * @ingroup ServerFunctions |
313 | */ |
314 | static void svr_cnx_free(vws_svr_cnx* c); |
315 | |
316 | /** |
317 | * @brief Actively close a client connection |
318 | * |
319 | * @param c The connection |
320 | */ |
321 | static void svr_cnx_close(vws_tcp_svr* server, vws_cid_t c); |
322 | |
323 | /** |
324 | * @brief Callback for client connection. |
325 | * |
326 | * This function is triggered when a new client connection is established. It |
327 | * is responsible for processing any steps necessary at the start of a |
328 | * connection. |
329 | * |
330 | * @param c The connection structure representing the client that has connected. |
331 | * |
332 | * @ingroup ServerFunctions |
333 | */ |
334 | static void svr_client_connect(vws_svr_cnx* c); |
335 | |
336 | /** |
337 | * @brief Callback for client disconnection. |
338 | * |
339 | * This function is triggered when a client connection is terminated. It is |
340 | * responsible for processing any cleanup or other steps necessary at the end of |
341 | * a connection. |
342 | * |
343 | * @param c The connection |
344 | * |
345 | * @ingroup ServerFunctions |
346 | */ |
347 | static void svr_client_disconnect(vws_svr_cnx* c); |
348 | |
349 | /** |
350 | * @brief Callback for client read operations. |
351 | * |
352 | * This function is triggered when data is read from a client connection. It is |
353 | * responsible for processing the received data. |
354 | * |
355 | * @param c The connection that sent the data. |
356 | * @param size The size of the data that was read. |
357 | * @param buf The buffer containing the data that was read. |
358 | * |
359 | * @ingroup ServerFunctions |
360 | */ |
361 | static void svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf); |
362 | |
363 | /** |
364 | * @brief Callback for processing client data in (ingress) |
365 | * |
366 | * This function processes data arriving from client to worker thread. It is |
367 | * responsible for handling the actual computation or other work associated with |
368 | * the data. This takes place in the context of worker_thread(). |
369 | * |
370 | * @param data The incoming data from the client to process. |
371 | * |
372 | * @ingroup ServerFunctions |
373 | */ |
374 | static void svr_client_data_in(vws_svr_data* data, void* x); |
375 | |
376 | /** |
377 | * @brief Callback for processing client data in (ingress) |
378 | * |
379 | * This function is triggered to process data arriving from worker thread to be |
380 | * send back to client. It is responsible for transferring the data from the |
381 | * response argument (vws_svr_data) onto the wire (client socket via libuv), |
382 | * actual computation or other work associated with the data. This takes place |
383 | * in the context of uv_thread(). |
384 | * |
385 | * @param data The outgoing data from worker to client |
386 | * |
387 | * @ingroup ServerFunctions |
388 | */ |
389 | static void svr_client_data_out(vws_svr_data* data, void* x); |
390 | |
391 | |
392 | |
393 | |
394 | /** |
395 | * @defgroup WebSocketServerFunctions |
396 | * |
397 | * @brief Functions that support message server operation |
398 | * |
399 | */ |
400 | |
401 | /** |
402 | * @brief WebSocket server constructor. |
403 | * |
404 | * @param server The WebSocket server instance. |
405 | * @param nt The number of threads to run in the worker pool. |
406 | * @param bl The connection backlog for listen(). If set to 0, it uses the |
407 | * default value (128). |
408 | * @param qs The maximum queue size for requests and responses. If set to 0, it |
409 | * uses the default value (1024). |
410 | */ |
411 | static void ws_svr_ctor(vws_svr* server, int nt, int bl, int qs); |
412 | |
413 | /** |
414 | * @brief WebSocket server destructor. |
415 | * |
416 | * @param server The WebSocket server instance to free. |
417 | */ |
418 | static void ws_svr_dtor(vws_svr* server); |
419 | |
420 | /** |
421 | * @brief Callback for client connection. |
422 | * |
423 | * This function is triggered when a new client connection is established. It |
424 | * is responsible for processing any steps necessary at the start of a |
425 | * connection. |
426 | * |
427 | * @param c The connection structure representing the client that has connected. |
428 | * |
429 | * @ingroup WebSocketServerFunctions |
430 | */ |
431 | static void ws_svr_client_connect(vws_svr_cnx* c); |
432 | |
433 | /** |
434 | * @brief Callback for client disconnection. |
435 | * |
436 | * This function is triggered when a client connection is terminated. It is |
437 | * responsible for processing any cleanup or other steps necessary at the end of |
438 | * a connection. |
439 | * |
440 | * @param c The connection |
441 | * |
442 | * @ingroup WebSocketServerFunctions |
443 | */ |
444 | static void ws_svr_client_disconnect(vws_svr_cnx* c); |
445 | |
446 | /** |
447 | * @brief Callback for client read operations. |
448 | * |
449 | * This function is triggered when data is read from a client connection. It is |
450 | * responsible for processing the received data. |
451 | * |
452 | * @param c The connection that sent the data. |
453 | * @param size The size of the data that was read. |
454 | * @param buf The buffer containing the data that was read. |
455 | * |
456 | * @ingroup WebSocketServerFunctions |
457 | */ |
458 | static void ws_svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf); |
459 | |
460 | /** |
461 | * @brief Callback for processing client data in (ingress) for msg server |
462 | * |
463 | * This function processes data arriving from client to worker thread. It |
464 | * collects data until there is a complete message. It passes message to |
465 | * ws_svr_client_msg_in() for processing. This takes place in the context of |
466 | * worker_thread(). |
467 | * |
468 | * @param server The server instance |
469 | * @param data The incoming data from the client to process. |
470 | * @param x The user-defined context |
471 | * |
472 | * @ingroup WebSocketServerFunctions |
473 | */ |
474 | static void ws_svr_client_data_in(vws_svr_data* data, void* x); |
475 | |
476 | /** |
477 | * @brief Sends data from a server connection to a client WebSocket connection. |
478 | * |
479 | * @param server The server |
480 | * @param c The connection index |
481 | * @param buffer The data to send. |
482 | * @param opcode The opcode for the WebSocket frame. |
483 | */ |
484 | static void ws_svr_client_data_out( vws_svr* server, |
485 | vws_cid_t c, |
486 | vws_buffer* buffer, |
487 | unsigned char opcode); |
488 | |
489 | /** |
490 | * @brief Process a WebSocket frame received from a client. |
491 | * |
492 | * @param c The WebSocket connection. |
493 | * @param f The incoming frame to process. |
494 | */ |
495 | static void ws_svr_process_frame(vws_cnx* c, vws_frame* f); |
496 | |
497 | /** |
498 | * @brief Callback for client message processing |
499 | * |
500 | * This function is triggered when a message is read from a client |
501 | * connection. It is responsible for processing the message. This takes place in |
502 | * the context of worker_thread(). |
503 | * |
504 | * @param s The server |
505 | * @param c The connection ID |
506 | * @param m The message to process |
507 | * @param x The user-defined context |
508 | * |
509 | * @ingroup WebSocketServerFunctions |
510 | */ |
511 | static void ws_svr_client_msg_in(vws_svr* s, vws_cid_t cid, vws_msg* m, void* x); |
512 | |
513 | /** |
514 | * @brief Callback for sending message to client. It takes a message as input, |
515 | * serializes it to a binary WebSocket message and then sends it to the |
516 | * uv_thread() to send back to client. This takes place in the context of |
517 | * worker_thread() (only to be called within that context). |
518 | * |
519 | * This function sends a message back to a client. |
520 | * |
521 | * @param s The server |
522 | * @param c The connection ID |
523 | * @param m The message to send |
524 | * @param x The user-defined context |
525 | * |
526 | * @ingroup WebSocketServerFunctions |
527 | */ |
528 | static void ws_svr_client_msg_out(vws_svr* s, vws_cid_t c, vws_msg* m, void* x); |
529 | |
530 | /** |
531 | * @brief Default WebSocket message processing function. This is mean to be |
532 | * overriden by application to perform message processing, specifically by |
533 | * assigning the vws_svr.process callback to the desired processing |
534 | * handler. The default implementation simple drops (deletes) the incoming |
535 | * message. |
536 | * |
537 | * @param s The server instance. |
538 | * @param c The server connection. |
539 | * @param m The incoming WebSocket message. |
540 | * @param x The user-defined context |
541 | * |
542 | * @ingroup MessageServerFunctions |
543 | */ |
544 | static void ws_svr_client_process(vws_svr* s, vws_cid_t c, vws_msg* m, void* x); |
545 | |
546 | |
547 | |
548 | |
549 | /** |
550 | * @defgroup MessageServerFunctions |
551 | * |
552 | * @brief Functions that support message server operation |
553 | * |
554 | */ |
555 | |
556 | /** |
557 | * @brief Convert incoming WebSocket messages to VRTQL messages for processing. |
558 | * |
559 | * @param s The server instance |
560 | * @param c The connection ID |
561 | * @param m The incoming WebSocket message. |
562 | * @param x The user-defined context |
563 | * |
564 | * @ingroup MessageServerFunctions |
565 | */ |
566 | static void msg_svr_client_ws_msg_in(vws_svr* s, vws_cid_t c, vws_msg* m, void* x); |
567 | |
568 | /** |
569 | * @brief Callback function for handling incoming VRTQL messages from a client. |
570 | * |
571 | * @param s The server instance |
572 | * @param c The connection ID |
573 | * @param m The incoming VRTQL message. |
574 | * @param x The user-defined context |
575 | * |
576 | * @ingroup MessageServerFunctions |
577 | */ |
578 | static void msg_svr_client_msg_in(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x); |
579 | |
580 | /** |
581 | * @brief Callback function for sending VRTQL messages to a client. |
582 | * |
583 | * @param s The server instance |
584 | * @param c The server connection. |
585 | * @param m The outgoing VRTQL message. |
586 | * @param x The user-defined context |
587 | * |
588 | * @ingroup MessageServerFunctions |
589 | */ |
590 | static void msg_svr_client_msg_out(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x); |
591 | |
592 | /** |
593 | * @brief Default VRTQL message processing function. |
594 | * |
595 | * @param s The server instance |
596 | * @param c The server connection. |
597 | * @param m The incoming VRTQL message. |
598 | * @param x The user-defined context |
599 | * |
600 | * @ingroup MessageServerFunctions |
601 | */ |
602 | static void msg_svr_client_process(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x); |
603 | |
604 | |
605 | |
606 | |
607 | /** |
608 | * @defgroup ConnectionMap |
609 | * |
610 | * @brief Functions that provide access to the connection map |
611 | * |
612 | * The connection map tracks all active connections. These functions simplify |
613 | * the map API and include memory management and other server-specific |
614 | * functionality where appropriate. |
615 | * |
616 | * @ingroup ConnectionMap |
617 | */ |
618 | |
619 | /** |
620 | * @brief Get the connection corresponding to a client from the connection map. |
621 | * |
622 | * @param map The connection map. |
623 | * @param key The client (used as key in the map). |
624 | * @return The corresponding server connection. |
625 | * |
626 | * @ingroup ConnectionMap |
627 | */ |
628 | static vws_svr_cnx* svr_cnx_map_get(vws_svr_cnx_map* map, uv_stream_t* key); |
629 | |
630 | /** |
631 | * @brief Set a connection for a client in the connection map. |
632 | * |
633 | * @param map The connection map. |
634 | * @param key The client (used as key in the map). |
635 | * @param value The server connection. |
636 | * @return Success or failure status. |
637 | * |
638 | * @ingroup ConnectionMap |
639 | */ |
640 | static int8_t svr_cnx_map_set( vws_svr_cnx_map* map, |
641 | uv_stream_t* key, |
642 | vws_svr_cnx* value ); |
643 | |
644 | /** |
645 | * @brief Remove a client's connection from the connection map. |
646 | * |
647 | * @param map The connection map. |
648 | * @param key The client (used as key in the map). |
649 | * |
650 | * @ingroup ConnectionMap |
651 | */ |
652 | static void svr_cnx_map_remove(vws_svr_cnx_map* map, uv_stream_t* key); |
653 | |
654 | /** |
655 | * @brief Clear all connections from the connection map. |
656 | * |
657 | * @param map The connection map. |
658 | * |
659 | * @ingroup ConnectionMap |
660 | */ |
661 | static void svr_cnx_map_clear(vws_svr_cnx_map* map); |
662 | |
663 | /** |
664 | * @defgroup QueueGroup |
665 | * |
666 | * @brief Queue functions which bridge the network thread and workers |
667 | * |
668 | * The network thread and worker threads pass data via queues. These queues |
669 | * contain vws_svr_data instances. There is a requests queue which passes |
670 | * data from the network thread to workers for processing. There is a response |
671 | * queue that passed data from worker queues to the network thread. When passed |
672 | * from network thread to work, data take form of incoming data from the |
673 | * client. When passed from the worker threads the the networking thread, data |
674 | * take the form of outgoing data to be sent back to the client. |
675 | * |
676 | * Queues have built-in synchronization mechanisms that allow the data to be |
677 | * safely passed between threads, as well as ways to gracefull put waiting |
678 | * threads to sleep until data arrives for processing. |
679 | */ |
680 | |
681 | /** |
682 | * @brief Initializes a server queue. |
683 | * |
684 | * This function sets up the provided queue with the given capacity. It also |
685 | * initializes the synchronization mechanisms associated with the queue. |
686 | * |
687 | * @param queue Pointer to the server queue to be initialized. |
688 | * @param capacity The maximum capacity of the queue. |
689 | * @param name The queue name |
690 | * |
691 | * @ingroup QueueGroup |
692 | */ |
693 | static void queue_init(vws_svr_queue* queue, int capacity, cstr name); |
694 | |
695 | /** |
696 | * @brief Destroys a server queue. |
697 | * |
698 | * This function cleans up the resources associated with the provided queue. |
699 | * It also handles the synchronization mechanisms associated with the queue. |
700 | * |
701 | * @param queue Pointer to the server queue to be destroyed. |
702 | * |
703 | * @ingroup QueueGroup |
704 | */ |
705 | static void queue_destroy(vws_svr_queue* queue); |
706 | |
707 | /** |
708 | * @brief Pushes data to the server queue. |
709 | * |
710 | * This function adds data to the end of the provided queue. It also handles |
711 | * the necessary synchronization to ensure thread safety. |
712 | * |
713 | * @param queue Pointer to the server queue. |
714 | * @param data Data to be added to the queue. |
715 | * |
716 | * @ingroup QueueGroup |
717 | */ |
718 | static void queue_push(vws_svr_queue* queue, vws_svr_data* data); |
719 | |
720 | /** |
721 | * @brief Pops data from the server queue. |
722 | * |
723 | * This function removes and returns data from the front of the provided queue. |
724 | * It also handles the necessary synchronization to ensure thread safety. |
725 | * |
726 | * @param queue Pointer to the server queue. |
727 | * @return A data element from the front of the queue. |
728 | * |
729 | * @ingroup QueueGroup |
730 | */ |
731 | static vws_svr_data* queue_pop(vws_svr_queue* queue); |
732 | |
733 | /** |
734 | * @brief Checks if the server queue is empty. |
735 | * |
736 | * This function checks whether the provided queue is empty. |
737 | * It also handles the necessary synchronization to ensure thread safety. |
738 | * |
739 | * @param queue Pointer to the server queue. |
740 | * @return True if the queue is empty, false otherwise. |
741 | * |
742 | * @ingroup QueueGroup |
743 | */ |
744 | static bool queue_empty(vws_svr_queue* queue); |
745 | |
746 | //------------------------------------------------------------------------------ |
747 | // Threads |
748 | //------------------------------------------------------------------------------ |
749 | |
750 | void worker_thread(void* arg) |
751 | { |
752 | vws_tcp_svr* server = (vws_tcp_svr*)arg; |
753 | |
754 | // Set thread tracing level to server. |
755 | vws.tracelevel = server->trace; |
756 | |
757 | if (vws.tracelevel >= VT_THREAD) |
758 | { |
759 | vws.trace(VL_INFO, "worker_thread(): Starting" ); |
760 | } |
761 | |
762 | vws_thread_ctx ctx; |
763 | ctx.ctor = server->worker_ctor; |
764 | ctx.ctor_data = server->worker_ctor_data; |
765 | ctx.dtor = server->worker_dtor; |
766 | ctx.data = NULL; |
767 | |
768 | if (ctx.ctor != NULL) |
769 | { |
770 | ctx.data = ctx.ctor(ctx.ctor_data); |
771 | } |
772 | |
773 | while (true) |
774 | { |
775 | //> Wait for arrival |
776 | |
777 | // This will put the thread to sleep on a condition variable until |
778 | // something arrives in queue. |
779 | vws_svr_data* request = queue_pop(&server->requests); |
780 | |
781 | // If there's no request (null request), check the server's state |
782 | if (request == NULL) |
783 | { |
784 | // If server is in halting state, return |
785 | if (server->state == VS_HALTING) |
786 | { |
787 | if (vws.tracelevel >= VT_THREAD) |
788 | { |
789 | vws.trace(VL_INFO, "worker_thread(): Exiting" ); |
790 | } |
791 | |
792 | return; |
793 | } |
794 | else |
795 | { |
796 | // If not halting, skip to the next iteration of the loop |
797 | continue; |
798 | } |
799 | } |
800 | |
801 | server->on_data_in(request, ctx.data); |
802 | } |
803 | |
804 | if (ctx.dtor != NULL) |
805 | { |
806 | ctx.dtor(ctx.data); |
807 | } |
808 | } |
809 | |
810 | void uv_thread(uv_async_t* handle) |
811 | { |
812 | vws_cinfo* cinfo = (vws_cinfo*)handle->data; |
813 | vws_tcp_svr* server = cinfo->server; |
814 | |
815 | if (server->state == VS_HALTING) |
816 | { |
817 | if (vws.tracelevel >= VT_THREAD) |
818 | { |
819 | vws.trace(VL_INFO, "uv_thread(): stop" ); |
820 | } |
821 | |
822 | // Join worker threads. Worker threads must all exit before we can |
823 | // shutdown libuv as they must release their mutexes and condition |
824 | // variables first, otherwise uv_stop() will not actually stop the loop. |
825 | for (int i = 0; i < server->pool_size; i++) |
826 | { |
827 | uv_thread_join(&server->threads[i]); |
828 | } |
829 | |
830 | svr_shutdown(server); |
831 | |
832 | return; |
833 | } |
834 | |
835 | while (queue_empty(&server->responses) == false) |
836 | { |
837 | vws_svr_data* data = queue_pop(&server->responses); |
838 | |
839 | if ((data == NULL) || (server->responses.state != VS_RUNNING)) |
840 | { |
841 | if (data != NULL) |
842 | { |
843 | vws_svr_data_free(data); |
844 | } |
845 | |
846 | return; |
847 | } |
848 | |
849 | if (vws_is_flag(&data->flags, VM_SVR_DATA_CLOSE)) |
850 | { |
851 | // Close connection |
852 | svr_cnx_close(server, data->cid); |
853 | vws_svr_data_free(data); |
854 | } |
855 | else |
856 | { |
857 | server->on_data_out(data, NULL); |
858 | } |
859 | } |
860 | } |
861 | |
862 | //------------------------------------------------------------------------------ |
863 | // Server API |
864 | //------------------------------------------------------------------------------ |
865 | |
866 | vws_svr_data* vws_svr_data_new(vws_tcp_svr* s, vws_cid_t cid, vws_buffer** b) |
867 | { |
868 | // Create a new vws_svr_data taking ownership of the buffer's data |
869 | vws_svr_data* item = vws_svr_data_own(s, cid, (*b)->data, (*b)->size); |
870 | |
871 | // Since we take ownership of buffer data, we clear the buffer. |
872 | (*b)->data = NULL; |
873 | (*b)->size = 0; |
874 | |
875 | return item; |
876 | } |
877 | |
878 | vws_svr_data* vws_svr_data_own(vws_tcp_svr* s, vws_cid_t cid, ucstr data, size_t size) |
879 | { |
880 | vws_svr_data* item; |
881 | item = (vws_svr_data*)vws.malloc(sizeof(vws_svr_data)); |
882 | |
883 | item->server = s; |
884 | item->cid = cid; |
885 | item->size = size; |
886 | item->data = data; |
887 | item->flags = 0; |
888 | |
889 | return item; |
890 | } |
891 | |
892 | void vws_svr_data_free(vws_svr_data* t) |
893 | { |
894 | if (t != NULL) |
895 | { |
896 | vws.free(t->data); |
897 | vws.free(t); |
898 | } |
899 | } |
900 | |
901 | vws_tcp_svr* vws_tcp_svr_new(int num_threads, int backlog, int queue_size) |
902 | { |
903 | vws_tcp_svr* server = vws.malloc(sizeof(vws_tcp_svr)); |
904 | return tcp_svr_ctor(server, num_threads, backlog, queue_size); |
905 | } |
906 | |
907 | uint8_t vws_tcp_svr_state(vws_tcp_svr* server) |
908 | { |
909 | return server->state; |
910 | } |
911 | |
912 | void vws_tcp_svr_free(vws_tcp_svr* server) |
913 | { |
914 | if (server == NULL) |
915 | { |
916 | return; |
917 | } |
918 | |
919 | tcp_svr_dtor(server); |
920 | vws.free(server); |
921 | } |
922 | |
923 | int vws_tcp_svr_send(vws_svr_data* data) |
924 | { |
925 | queue_push(&data->server->responses, data); |
926 | |
927 | // Notify event loop about the new response |
928 | uv_async_send(data->server->wakeup); |
929 | |
930 | return 0; |
931 | } |
932 | |
933 | int vws_tcp_svr_run(vws_tcp_svr* server, cstr host, int port) |
934 | { |
935 | if (vws.tracelevel >= VT_SERVICE) |
936 | { |
937 | vws.trace( VL_INFO, |
938 | "vws_tcp_svr_run(%p): Starting worker %i threads" , |
939 | server, |
940 | server->pool_size ); |
941 | } |
942 | |
943 | for (int i = 0; i < server->pool_size; i++) |
944 | { |
945 | uv_thread_create(&server->threads[i], worker_thread, server); |
946 | } |
947 | |
948 | //> Create listening socket |
949 | |
950 | uv_tcp_t* socket = vws.malloc(sizeof(uv_tcp_t)); |
951 | |
952 | uv_tcp_init(server->loop, socket); |
953 | |
954 | vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo)); |
955 | cinfo->cnx = NULL; |
956 | cinfo->server = server; |
957 | cinfo->cid = 0; |
958 | |
959 | socket->data = cinfo; |
960 | |
961 | //> Bind to address |
962 | |
963 | int rc; |
964 | struct sockaddr_in addr; |
965 | uv_ip4_addr(host, port, &addr); |
966 | rc = uv_tcp_bind(socket, (const struct sockaddr*)&addr, 0); |
967 | |
968 | if (vws.tracelevel >= VT_SERVICE) |
969 | { |
970 | vws.trace( VL_INFO, |
971 | "vws_tcp_svr_run(%p): Bind %s:%lu" , |
972 | server, host, port ); |
973 | } |
974 | |
975 | if (rc) |
976 | { |
977 | vws.error(VE_RT, "Bind error %s" , uv_strerror(rc)); |
978 | return -1; |
979 | } |
980 | |
981 | //> Listen |
982 | |
983 | rc = uv_listen((uv_stream_t*)socket, server->backlog, svr_on_connect); |
984 | |
985 | if (rc) |
986 | { |
987 | vws.error(VE_RT, "Listen error %s" , uv_strerror(rc)); |
988 | return -1; |
989 | } |
990 | |
991 | if (vws.tracelevel >= VT_SERVICE) |
992 | { |
993 | vws.trace( VL_INFO, |
994 | "vws_tcp_svr_run(%s): Listen %s:%lu" , |
995 | server, host, port ); |
996 | } |
997 | |
998 | //> Start server |
999 | |
1000 | // Set state to running |
1001 | server->state = VS_RUNNING; |
1002 | |
1003 | if (vws.tracelevel >= VT_SERVICE) |
1004 | { |
1005 | vws.trace(VL_INFO, "vws_tcp_svr_run(%s): Starting uv_run()" , server); |
1006 | } |
1007 | |
1008 | while (server->state == VS_RUNNING) |
1009 | { |
1010 | // Run UV loop. This runs indefinitely, passing network I/O and and out |
1011 | // of system until server is shutdown by vws_tcp_svr_stop() (by external |
1012 | // thread). |
1013 | uv_run(server->loop, UV_RUN_DEFAULT); |
1014 | } |
1015 | |
1016 | //> Shutdown server |
1017 | |
1018 | // Close the listening socket handle |
1019 | uv_close((uv_handle_t*)socket, svr_on_close); |
1020 | |
1021 | if (vws.tracelevel >= VT_SERVICE) |
1022 | { |
1023 | vws.trace(VL_INFO, "vws_tcp_svr_run(%p): Shutdown complete" , server); |
1024 | } |
1025 | |
1026 | // Set state to halted |
1027 | server->state = VS_HALTED; |
1028 | |
1029 | return 0; |
1030 | } |
1031 | |
1032 | void vws_tcp_svr_stop(vws_tcp_svr* server) |
1033 | { |
1034 | // Set shutdown flags |
1035 | server->state = VS_HALTING; |
1036 | server->requests.state = VS_HALTING; |
1037 | server->responses.state = VS_HALTING; |
1038 | |
1039 | // Wakeup all worker threads |
1040 | if (vws.tracelevel >= VT_SERVICE) |
1041 | { |
1042 | vws.trace(VL_INFO, "vws_tcp_svr_stop(): stop worker threads" ); |
1043 | } |
1044 | |
1045 | uv_mutex_lock(&server->requests.mutex); |
1046 | uv_cond_broadcast(&server->requests.cond); |
1047 | uv_mutex_unlock(&server->requests.mutex); |
1048 | |
1049 | // Wakeup the main event loop to shutdown main thread |
1050 | if (vws.tracelevel >= VT_SERVICE) |
1051 | { |
1052 | vws.trace(VL_INFO, "vws_tcp_svr_stop(): stop main thread" ); |
1053 | } |
1054 | |
1055 | uv_async_send(server->wakeup); |
1056 | |
1057 | while (server->state != VS_HALTED) |
1058 | { |
1059 | sleep(1); |
1060 | } |
1061 | } |
1062 | |
1063 | int vws_tcp_svr_inetd_run(vws_tcp_svr* server, int sockfd) |
1064 | { |
1065 | if (sockfd < 0) |
1066 | { |
1067 | vws.error(VE_RT, "Invalid server or socket descriptor provided" ); |
1068 | return 1; |
1069 | } |
1070 | |
1071 | if (vws.tracelevel >= VT_SERVICE) |
1072 | { |
1073 | vws.trace( VL_INFO, |
1074 | "vws_tcp_svr_run(%p): Starting worker %i threads" , |
1075 | server, |
1076 | server->pool_size ); |
1077 | } |
1078 | |
1079 | for (int i = 0; i < server->pool_size; i++) |
1080 | { |
1081 | uv_thread_create(&server->threads[i], worker_thread, server); |
1082 | } |
1083 | |
1084 | // Go into non-blocking mode as we are using poll() for socket_read() and |
1085 | // socket_write(). |
1086 | if (vws_socket_set_nonblocking(sockfd) == false) |
1087 | { |
1088 | vws.error(VE_RT, "Failed to set socket to nonblocking" ); |
1089 | return 1; |
1090 | } |
1091 | |
1092 | // Set to inetd mode |
1093 | server->inetd_mode = 1; |
1094 | |
1095 | // Initialize and adopt the existing socket descriptor. |
1096 | uv_tcp_t* c = (uv_tcp_t*)vws.malloc(sizeof(uv_tcp_t)); |
1097 | |
1098 | if (uv_tcp_init(server->loop, c)) |
1099 | { |
1100 | // Handle uv_tcp_init failure. |
1101 | vws.error(VE_RT, "Failed to initialize new TCP handle" ); |
1102 | vws.free(c); |
1103 | return 1; |
1104 | } |
1105 | |
1106 | if (uv_tcp_open(c, sockfd)) |
1107 | { |
1108 | // Handle uv_tcp_open failure. |
1109 | vws.error(VE_RT, "Failed to adopt the socket descriptor." ); |
1110 | vws.free(c); |
1111 | return 1; |
1112 | } |
1113 | |
1114 | vws_cinfo* addr = vws.malloc(sizeof(vws_cinfo)); |
1115 | addr->cnx = NULL; |
1116 | addr->server = server; |
1117 | |
1118 | c->data = addr; |
1119 | |
1120 | if (uv_read_start((uv_stream_t*)c, svr_on_realloc, svr_on_read) != 0) |
1121 | { |
1122 | vws.error(VE_RT, "Failed to start reading from client" ); |
1123 | vws.free(c); |
1124 | vws.free(addr); |
1125 | return 1; |
1126 | } |
1127 | |
1128 | //> Add connection to registry and initialize |
1129 | |
1130 | // Associate server with this handle for callbacks. |
1131 | vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)c); |
1132 | addr->cnx = cnx; |
1133 | addr->cid = cnx->cid; |
1134 | |
1135 | //> Call svr_on_connect() handler |
1136 | |
1137 | server->on_connect(cnx); |
1138 | |
1139 | // Now, the handle is associated with the socket and is ready to be used. |
1140 | // Start the libuv loop. |
1141 | uv_run(server->loop, UV_RUN_DEFAULT); |
1142 | |
1143 | return 0; |
1144 | } |
1145 | |
1146 | void vws_tcp_svr_inetd_stop(vws_tcp_svr* server) |
1147 | { |
1148 | // Set shutdown flags |
1149 | server->state = VS_HALTING; |
1150 | server->requests.state = VS_HALTING; |
1151 | server->responses.state = VS_HALTING; |
1152 | |
1153 | // Stop the loop. We have not more I/O to deal with. We don't want the loop |
1154 | // to run any more for any reason. |
1155 | uv_stop(server->loop); |
1156 | |
1157 | // Wakeup all worker threads |
1158 | if (vws.tracelevel >= VT_SERVICE) |
1159 | { |
1160 | vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): stop worker threads" ); |
1161 | } |
1162 | |
1163 | uv_mutex_lock(&server->requests.mutex); |
1164 | uv_cond_broadcast(&server->requests.cond); |
1165 | uv_mutex_unlock(&server->requests.mutex); |
1166 | |
1167 | // Wakeup the main event loop to shutdown main thread |
1168 | if (vws.tracelevel >= VT_SERVICE) |
1169 | { |
1170 | vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): stop main thread" ); |
1171 | } |
1172 | |
1173 | // Wait for all threads to complete |
1174 | uv_thread(server->wakeup); |
1175 | |
1176 | if (vws.tracelevel >= VT_SERVICE) |
1177 | { |
1178 | vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): done" ); |
1179 | } |
1180 | |
1181 | // Set state to halted |
1182 | server->state = VS_HALTED; |
1183 | } |
1184 | |
1185 | //------------------------------------------------------------------------------ |
1186 | // Server construction / destruction |
1187 | //------------------------------------------------------------------------------ |
1188 | |
1189 | vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* svr, int nt, int backlog, int queue_size) |
1190 | { |
1191 | if (backlog == 0) |
1192 | { |
1193 | backlog = 128; |
1194 | } |
1195 | |
1196 | if (queue_size == 0) |
1197 | { |
1198 | queue_size = 1024; |
1199 | } |
1200 | |
1201 | svr->threads = vws.malloc(sizeof(uv_thread_t) * nt); |
1202 | svr->pool_size = nt; |
1203 | svr->on_connect = svr_client_connect; |
1204 | svr->on_disconnect = svr_client_disconnect; |
1205 | svr->on_read = svr_client_read; |
1206 | svr->on_data_in = svr_client_data_in; |
1207 | svr->on_data_out = svr_client_data_out; |
1208 | svr->worker_ctor = NULL; |
1209 | svr->worker_ctor_data = NULL; |
1210 | svr->worker_dtor = NULL; |
1211 | svr->backlog = backlog; |
1212 | svr->loop = (uv_loop_t*)vws.malloc(sizeof(uv_loop_t)); |
1213 | svr->state = VS_HALTED; |
1214 | svr->trace = vws.tracelevel; |
1215 | svr->inetd_mode = 0; |
1216 | |
1217 | uv_loop_init(svr->loop); |
1218 | svr->cpool = address_pool_new(1000, 2); |
1219 | queue_init(&svr->requests, queue_size, "requests" ); |
1220 | queue_init(&svr->responses, queue_size, "responses" ); |
1221 | |
1222 | vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo)); |
1223 | cinfo->cnx = NULL; |
1224 | cinfo->server = svr; |
1225 | cinfo->cid = 0; |
1226 | |
1227 | svr->wakeup = vws.malloc(sizeof(uv_async_t)); |
1228 | svr->wakeup->data = cinfo; |
1229 | uv_async_init(svr->loop, svr->wakeup, uv_thread); |
1230 | |
1231 | return svr; |
1232 | } |
1233 | |
1234 | void on_uv_close(uv_handle_t* handle) |
1235 | { |
1236 | if (handle != NULL) |
1237 | { |
1238 | // As a policy we don't put heap data on handle->data. If that is ever |
1239 | // done then it must be freed in the appropriate place. It is ignored |
1240 | // here because it's impossible to tell what it is by the very nature of |
1241 | // uv_handle_t. We will generate a warning however. |
1242 | if (handle->data != NULL) |
1243 | { |
1244 | vws.trace( VL_WARN, |
1245 | "on_uv_close(): libuv resource not properly freed: %p" , |
1246 | (void*)handle->data ); |
1247 | } |
1248 | } |
1249 | } |
1250 | |
1251 | void on_uv_walk(uv_handle_t* handle, void* arg) |
1252 | { |
1253 | // If this handle has not been closed, it should have been. Nevertheless we |
1254 | // will make an attempt to close it. |
1255 | if (uv_is_closing(handle) == 0) |
1256 | { |
1257 | uv_close(handle, (uv_close_cb)on_uv_close); |
1258 | } |
1259 | } |
1260 | |
1261 | void tcp_svr_dtor(vws_tcp_svr* svr) |
1262 | { |
1263 | //> Stop/shutdown server |
1264 | |
1265 | if (svr->state == VS_RUNNING) |
1266 | { |
1267 | vws_tcp_svr_stop(svr); |
1268 | } |
1269 | |
1270 | svr_shutdown(svr); |
1271 | vws.free(svr->threads); |
1272 | |
1273 | // Close the server async handle |
1274 | uv_close((uv_handle_t*)svr->wakeup, svr_on_close); |
1275 | |
1276 | //> Shutdown libuv |
1277 | |
1278 | // Walk the loop to close everything |
1279 | uv_walk(svr->loop, on_uv_walk, NULL); |
1280 | |
1281 | while (uv_loop_close(svr->loop)) |
1282 | { |
1283 | // Run the loop until there are no more active handles |
1284 | while (uv_loop_alive(svr->loop)) |
1285 | { |
1286 | uv_run(svr->loop, UV_RUN_DEFAULT); |
1287 | } |
1288 | } |
1289 | |
1290 | // Free loop |
1291 | vws.free(svr->loop); |
1292 | |
1293 | // Free address pool |
1294 | address_pool_free(&svr->cpool); |
1295 | } |
1296 | |
1297 | //------------------------------------------------------------------------------ |
1298 | // Client connection callbacks |
1299 | //------------------------------------------------------------------------------ |
1300 | |
1301 | void svr_client_connect(vws_svr_cnx* c) |
1302 | { |
1303 | if (vws.tracelevel >= VT_SERVICE) |
1304 | { |
1305 | vws.trace(VL_INFO, "svr_client_connect(%p)" , c->handle); |
1306 | } |
1307 | } |
1308 | |
1309 | void svr_client_disconnect(vws_svr_cnx* c) |
1310 | { |
1311 | if (vws.tracelevel >= VT_SERVICE) |
1312 | { |
1313 | vws.trace(VL_INFO, "svr_client_disconnect(%p)" , c->handle); |
1314 | } |
1315 | } |
1316 | |
1317 | void svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf) |
1318 | { |
1319 | vws_tcp_svr* server = c->server; |
1320 | |
1321 | // Queue data to worker pool for processing |
1322 | vws_svr_data* data = vws_svr_data_own(server, c->cid, (ucstr)buf->base, size); |
1323 | |
1324 | // Store reference to server |
1325 | data->server = c->server; |
1326 | |
1327 | // Put on queue |
1328 | queue_push(&server->requests, data); |
1329 | } |
1330 | |
1331 | void svr_client_data_in(vws_svr_data* m, void* x) |
1332 | { |
1333 | // Default: Do nothing. Drop data. |
1334 | vws_svr_data_free(m); |
1335 | } |
1336 | |
1337 | void svr_client_data_out(vws_svr_data* data, void* x) |
1338 | { |
1339 | if (data->size == 0) |
1340 | { |
1341 | vws.trace(VL_INFO, "svr_client_data_out(): no data" ); |
1342 | vws.error(VL_WARN, "svr_client_data_out(): no data" ); |
1343 | return; |
1344 | } |
1345 | |
1346 | uv_buf_t buf = uv_buf_init(data->data, data->size); |
1347 | uv_write_t* req = (uv_write_t*)vws.malloc(sizeof(uv_write_t)); |
1348 | req->data = data; |
1349 | |
1350 | // Check address pool and ensure connection is still active |
1351 | uintptr_t ptr = address_pool_get(data->server->cpool, data->cid); |
1352 | |
1353 | if (ptr == 0) |
1354 | { |
1355 | // Connection no longer exists. |
1356 | vws_svr_data_free(data); |
1357 | vws.free(req); |
1358 | return; |
1359 | } |
1360 | |
1361 | // Write out to libuv |
1362 | uv_stream_t* handle = ((vws_svr_cnx*)ptr)->handle; |
1363 | if (uv_write(req, handle, &buf, 1, svr_on_write_complete) != 0) |
1364 | { |
1365 | // Connection is closing/closed. |
1366 | vws_svr_data_free(data); |
1367 | vws.free(req); |
1368 | } |
1369 | } |
1370 | |
1371 | //------------------------------------------------------------------------------ |
1372 | // Server Connection |
1373 | //------------------------------------------------------------------------------ |
1374 | |
1375 | vws_svr_cnx* svr_cnx_new(vws_tcp_svr* s, uv_stream_t* handle) |
1376 | { |
1377 | vws_svr_cnx* cnx = vws.malloc(sizeof(vws_svr_cnx)); |
1378 | cnx->server = s; |
1379 | cnx->handle = handle; |
1380 | cnx->data = NULL; |
1381 | cnx->format = VM_MPACK_FORMAT; |
1382 | |
1383 | // Initialize HTTP state |
1384 | cnx->upgraded = false; |
1385 | cnx->http = vws_http_msg_new(HTTP_REQUEST); |
1386 | |
1387 | // Add to address pool |
1388 | cnx->cid = address_pool_set(s->cpool, (uintptr_t)cnx); |
1389 | |
1390 | return cnx; |
1391 | } |
1392 | |
1393 | void svr_cnx_free(vws_svr_cnx* c) |
1394 | { |
1395 | if (c != NULL) |
1396 | { |
1397 | if (c->http != NULL) |
1398 | { |
1399 | vws_http_msg_free(c->http); |
1400 | } |
1401 | |
1402 | // Remove from pool |
1403 | address_pool_remove(c->server->cpool, c->cid); |
1404 | |
1405 | vws.free(c); |
1406 | } |
1407 | } |
1408 | |
1409 | void svr_cnx_close(vws_tcp_svr* server, vws_cid_t cid) |
1410 | { |
1411 | vws_tcp_svr_close(server, cid); |
1412 | } |
1413 | |
1414 | //------------------------------------------------------------------------------ |
1415 | // Server Utilities |
1416 | //------------------------------------------------------------------------------ |
1417 | |
1418 | vws_svr_cnx* svr_cnx_map_get(vws_svr_cnx_map* map, uv_stream_t* key) |
1419 | { |
1420 | vws_svr_cnx* cnx = sc_map_get_64v(map, (uint64_t)key); |
1421 | |
1422 | // If entry exists |
1423 | if (sc_map_found(map) == true) |
1424 | { |
1425 | return cnx; |
1426 | } |
1427 | |
1428 | return NULL; |
1429 | } |
1430 | |
1431 | void svr_cnx_map_remove(vws_svr_cnx_map* map, uv_stream_t* key) |
1432 | { |
1433 | vws_svr_cnx* cnx = sc_map_get_64v(map, (uint64_t)key); |
1434 | |
1435 | // If entry exists |
1436 | if (sc_map_found(map) == true) |
1437 | { |
1438 | sc_map_del_64v(map, (uint64_t)key); |
1439 | |
1440 | // Call on_disconnect() handler |
1441 | cnx->server->on_disconnect(cnx); |
1442 | |
1443 | // Cleanup |
1444 | vws.free(cnx); |
1445 | } |
1446 | } |
1447 | |
1448 | int8_t svr_cnx_map_set(vws_svr_cnx_map* map, uv_stream_t* key, vws_svr_cnx* value) |
1449 | { |
1450 | // See if we have an existing entry |
1451 | sc_map_get_64v(map, (uint64_t)key); |
1452 | |
1453 | if (sc_map_found(map) == true) |
1454 | { |
1455 | // Exsiting entry. Return false. |
1456 | return 0; |
1457 | } |
1458 | |
1459 | sc_map_put_64v(map, (uint64_t)key, value); |
1460 | |
1461 | // True |
1462 | return 1; |
1463 | } |
1464 | |
1465 | void svr_cnx_map_clear(vws_svr_cnx_map* map) |
1466 | { |
1467 | uint64_t key; vws_svr_cnx* cnx; |
1468 | sc_map_foreach(map, key, cnx) |
1469 | { |
1470 | cnx->server->on_disconnect(cnx); |
1471 | vws.free(cnx); |
1472 | } |
1473 | |
1474 | sc_map_clear_64v(map); |
1475 | } |
1476 | |
1477 | void svr_shutdown(vws_tcp_svr* server) |
1478 | { |
1479 | if (server->state == VS_HALTED) |
1480 | { |
1481 | return; |
1482 | } |
1483 | |
1484 | if (vws.tracelevel >= VT_SERVICE) |
1485 | { |
1486 | vws.trace(VL_INFO, "svr_shutdown(%p): Shutdown starting" , server); |
1487 | } |
1488 | |
1489 | // Cleanup queues |
1490 | |
1491 | vws_svr_queue* queue = &server->responses; |
1492 | uv_mutex_lock(&queue->mutex); |
1493 | while (queue->size > 0) |
1494 | { |
1495 | vws_svr_data* data = queue->buffer[queue->head]; |
1496 | queue->head = (queue->head + 1) % queue->capacity; |
1497 | queue->size--; |
1498 | |
1499 | vws_svr_data_free(data); |
1500 | } |
1501 | uv_mutex_unlock(&queue->mutex); |
1502 | |
1503 | queue_destroy(&server->requests); |
1504 | queue_destroy(&server->responses); |
1505 | |
1506 | // Stop the loop. This will cause uv_run() to return in vws_tcp_svr_run() |
1507 | // which will also return. |
1508 | uv_stop(server->loop); |
1509 | } |
1510 | |
1511 | void svr_on_connect(uv_stream_t* socket, int status) |
1512 | { |
1513 | vws_cinfo* svr_addr = (vws_cinfo*)socket->data; |
1514 | vws_tcp_svr* server = svr_addr->server; |
1515 | //vws_tcp_svr* server = (vws_tcp_svr*) socket->data; |
1516 | |
1517 | if (status < 0) |
1518 | { |
1519 | cstr e = uv_strerror(status); |
1520 | vws.error(VE_RT, "Error in connection callback: %s" , e); |
1521 | return; |
1522 | } |
1523 | |
1524 | uv_tcp_t* c = (uv_tcp_t*)vws.malloc(sizeof(uv_tcp_t)); |
1525 | |
1526 | if (c == NULL) |
1527 | { |
1528 | vws.error(VE_RT, "Failed to allocate memory for client" ); |
1529 | return; |
1530 | } |
1531 | |
1532 | vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo)); |
1533 | cinfo->cnx = NULL; |
1534 | cinfo->server = server; |
1535 | c->data = cinfo; |
1536 | |
1537 | if (uv_tcp_init(server->loop, c) != 0) |
1538 | { |
1539 | vws.error(VE_RT, "Failed to initialize client" ); |
1540 | return; |
1541 | } |
1542 | |
1543 | if (uv_accept(socket, (uv_stream_t*)c) == 0) |
1544 | { |
1545 | if (uv_read_start((uv_stream_t*)c, svr_on_realloc, svr_on_read) != 0) |
1546 | { |
1547 | vws.error(VE_RT, "Failed to start reading from client" ); |
1548 | return; |
1549 | } |
1550 | } |
1551 | else |
1552 | { |
1553 | uv_close((uv_handle_t*)c, svr_on_close); |
1554 | } |
1555 | |
1556 | //> Add connection to registry and initialize |
1557 | |
1558 | vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)c); |
1559 | cinfo->cnx = cnx; |
1560 | cinfo->cid = cnx->cid; |
1561 | |
1562 | //> Call svr_on_connect() handler |
1563 | |
1564 | server->on_connect(cnx); |
1565 | } |
1566 | |
1567 | void svr_on_read(uv_stream_t* c, ssize_t nread, const uv_buf_t* buf) |
1568 | { |
1569 | vws_cinfo* cinfo = (vws_cinfo*)c->data; |
1570 | vws_svr_cnx* cnx = cinfo->cnx;; |
1571 | vws_tcp_svr* server = cinfo->server; |
1572 | vws_cid_t cid = cinfo->cid; |
1573 | |
1574 | if (nread < 0) |
1575 | { |
1576 | uv_close((uv_handle_t*)c, svr_on_close); |
1577 | vws.free(buf->base); |
1578 | return; |
1579 | } |
1580 | |
1581 | // Lookup connection |
1582 | uintptr_t ptr = address_pool_get(server->cpool, cid); |
1583 | |
1584 | if (ptr != 0) |
1585 | { |
1586 | vws_svr_cnx* cnx = (vws_svr_cnx*)ptr; |
1587 | server->on_read(cnx, nread, buf); |
1588 | } |
1589 | } |
1590 | |
1591 | void svr_on_write_complete(uv_write_t* req, int status) |
1592 | { |
1593 | vws_svr_data_free((vws_svr_data*)req->data); |
1594 | vws.free(req); |
1595 | } |
1596 | |
1597 | void svr_on_close(uv_handle_t* handle) |
1598 | { |
1599 | vws_cinfo* cinfo = (vws_cinfo*)handle->data; |
1600 | vws_svr_cnx* cnx = cinfo->cnx; |
1601 | vws_tcp_svr* server = cinfo->server; |
1602 | vws_cid_t cid = cinfo->cid; |
1603 | |
1604 | // Lookup connection |
1605 | uintptr_t ptr = address_pool_get(server->cpool, cid); |
1606 | |
1607 | if (ptr != 0) |
1608 | { |
1609 | vws_svr_cnx* cnx = (vws_svr_cnx*)ptr; |
1610 | |
1611 | // Call on_disconnect() handler |
1612 | server->on_disconnect(cnx); |
1613 | |
1614 | // Remove from map |
1615 | //sc_map_del_64v(map, (uint64_t)handle); |
1616 | |
1617 | // Cleanup |
1618 | svr_cnx_free(cnx); |
1619 | } |
1620 | |
1621 | vws.free(handle->data); |
1622 | vws.free(handle); |
1623 | |
1624 | // If we are running in inetd mode, there is only one socket and its closing |
1625 | // means we are done and should exit process. |
1626 | if ((server->inetd_mode == 1) && (server->state == VS_RUNNING)) |
1627 | { |
1628 | vws_tcp_svr_inetd_stop(server); |
1629 | } |
1630 | } |
1631 | |
1632 | void svr_on_realloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) |
1633 | { |
1634 | buf->base = (char*)vws.realloc(buf->base, size); |
1635 | buf->len = size; |
1636 | } |
1637 | |
1638 | //------------------------------------------------------------------------------ |
1639 | // Queue API |
1640 | //------------------------------------------------------------------------------ |
1641 | |
1642 | void queue_init(vws_svr_queue* queue, int size, cstr name) |
1643 | { |
1644 | queue->buffer = (vws_svr_data**)vws.malloc(size * sizeof(vws_svr_data*)); |
1645 | queue->size = 0; |
1646 | queue->capacity = size; |
1647 | queue->head = 0; |
1648 | queue->tail = 0; |
1649 | queue->state = VS_RUNNING; |
1650 | queue->name = strdup(name); |
1651 | |
1652 | // Initialize mutex and condition variable |
1653 | uv_mutex_init(&queue->mutex); |
1654 | uv_cond_init(&queue->cond); |
1655 | } |
1656 | |
1657 | void queue_destroy(vws_svr_queue* queue) |
1658 | { |
1659 | if (queue->buffer != NULL) |
1660 | { |
1661 | vws.free(queue->name); |
1662 | uv_mutex_destroy(&queue->mutex); |
1663 | uv_cond_destroy(&queue->cond); |
1664 | vws.free(queue->buffer); |
1665 | queue->buffer = NULL; |
1666 | queue->state = VS_HALTED; |
1667 | } |
1668 | } |
1669 | |
1670 | void queue_push(vws_svr_queue* queue, vws_svr_data* data) |
1671 | { |
1672 | if (queue->state != VS_RUNNING) |
1673 | { |
1674 | vws.free(data); |
1675 | return; |
1676 | } |
1677 | |
1678 | uv_mutex_lock(&queue->mutex); |
1679 | |
1680 | while (queue->size == queue->capacity) |
1681 | { |
1682 | uv_cond_wait(&queue->cond, &queue->mutex); |
1683 | } |
1684 | |
1685 | if (queue->state != VS_RUNNING) |
1686 | { |
1687 | uv_cond_signal(&queue->cond); |
1688 | uv_mutex_unlock(&queue->mutex); |
1689 | return; |
1690 | } |
1691 | |
1692 | queue->buffer[queue->tail] = data; |
1693 | queue->tail = (queue->tail + 1) % queue->capacity; |
1694 | queue->size++; |
1695 | |
1696 | // Signal condition variable |
1697 | uv_cond_signal(&queue->cond); |
1698 | uv_mutex_unlock(&queue->mutex); |
1699 | } |
1700 | |
1701 | vws_svr_data* queue_pop(vws_svr_queue* queue) |
1702 | { |
1703 | uv_mutex_lock(&queue->mutex); |
1704 | |
1705 | while (queue->size == 0 && queue->state == VS_RUNNING) |
1706 | { |
1707 | uv_cond_wait(&queue->cond, &queue->mutex); |
1708 | } |
1709 | |
1710 | if (queue->state == VS_HALTING) |
1711 | { |
1712 | uv_cond_broadcast(&queue->cond); |
1713 | uv_mutex_unlock(&queue->mutex); |
1714 | |
1715 | return NULL; |
1716 | } |
1717 | |
1718 | vws_svr_data* data = queue->buffer[queue->head]; |
1719 | queue->head = (queue->head + 1) % queue->capacity; |
1720 | queue->size--; |
1721 | |
1722 | uv_mutex_unlock(&queue->mutex); |
1723 | |
1724 | return data; |
1725 | } |
1726 | |
1727 | bool queue_empty(vws_svr_queue* queue) |
1728 | { |
1729 | uv_mutex_lock(&queue->mutex); |
1730 | bool empty = (queue->size == 0); |
1731 | uv_mutex_unlock(&queue->mutex); |
1732 | |
1733 | return empty; |
1734 | } |
1735 | |
1736 | //------------------------------------------------------------------------------ |
1737 | // Pure WebSocket Server |
1738 | //------------------------------------------------------------------------------ |
1739 | |
1740 | void ws_svr_process_frame(vws_cnx* c, vws_frame* f) |
1741 | { |
1742 | vws_svr_cnx* cnx = (vws_svr_cnx*)c->data; |
1743 | |
1744 | switch (f->opcode) |
1745 | { |
1746 | case CLOSE_FRAME: |
1747 | { |
1748 | // Build the response frame |
1749 | vws_buffer* buffer = vws_generate_close_frame(); |
1750 | |
1751 | // Send back to cliane Send the PONG response |
1752 | vws_svr_data* response; |
1753 | |
1754 | response = vws_svr_data_new(cnx->server, cnx->cid, &buffer); |
1755 | vws_tcp_svr_send(response); |
1756 | |
1757 | // Free buffer |
1758 | vws_buffer_free(buffer); |
1759 | |
1760 | // Free frame |
1761 | vws_frame_free(f); |
1762 | |
1763 | break; |
1764 | } |
1765 | |
1766 | case TEXT_FRAME: |
1767 | case BINARY_FRAME: |
1768 | case CONTINUATION_FRAME: |
1769 | { |
1770 | // Add to queue |
1771 | sc_queue_add_first(&c->queue, f); |
1772 | |
1773 | break; |
1774 | } |
1775 | |
1776 | case PING_FRAME: |
1777 | { |
1778 | // Generate the PONG response |
1779 | vws_buffer* buffer = vws_generate_pong_frame(f->data, f->size); |
1780 | |
1781 | // Send back to cliane Send the PONG response |
1782 | vws_svr_data* response; |
1783 | response = vws_svr_data_new(cnx->server, cnx->cid, &buffer); |
1784 | vws_tcp_svr_send(response); |
1785 | |
1786 | // Free buffer |
1787 | vws_buffer_free(buffer); |
1788 | |
1789 | // Free frame |
1790 | vws_frame_free(f); |
1791 | |
1792 | break; |
1793 | } |
1794 | |
1795 | case PONG_FRAME: |
1796 | { |
1797 | // No need to send a response |
1798 | |
1799 | vws_frame_free(f); |
1800 | |
1801 | break; |
1802 | } |
1803 | |
1804 | default: |
1805 | { |
1806 | // Invalid frame type |
1807 | vws_frame_free(f); |
1808 | } |
1809 | } |
1810 | |
1811 | vws.success(); |
1812 | } |
1813 | |
1814 | void ws_svr_client_connect(vws_svr_cnx* c) |
1815 | { |
1816 | if (vws.tracelevel >= VT_SERVICE) |
1817 | { |
1818 | vws.trace(VL_INFO, "ws_svr_client_connect(%p)" , c->handle); |
1819 | } |
1820 | |
1821 | // Create a new vws_cnx |
1822 | vws_cnx* cnx = (void*)vws_cnx_new(); |
1823 | cnx->process = ws_svr_process_frame; |
1824 | cnx->data = (void*)c; // Link cnx -> c |
1825 | c->data = (void*)cnx; // Link c -> cnx |
1826 | } |
1827 | |
1828 | void ws_svr_client_disconnect(vws_svr_cnx* c) |
1829 | { |
1830 | if (vws.tracelevel >= VT_SERVICE) |
1831 | { |
1832 | vws.trace(VL_INFO, "ws_svr_client_disconnect(%p)" , c->handle); |
1833 | } |
1834 | |
1835 | if (c->data != NULL) |
1836 | { |
1837 | vws_cnx_free((vws_cnx*)c->data); |
1838 | c->data = NULL; |
1839 | } |
1840 | } |
1841 | |
1842 | void vws_tcp_svr_close(vws_tcp_svr* server, vws_cid_t cid) |
1843 | { |
1844 | // Create reply |
1845 | vws_svr_data* reply; |
1846 | reply = vws_svr_data_own(server, cid, NULL, 0); |
1847 | |
1848 | // Set connection close flag |
1849 | vws_set_flag(&reply->flags, VM_SVR_DATA_CLOSE); |
1850 | |
1851 | // Queue the data to uv_thread() to send out on wire |
1852 | vws_tcp_svr_send(reply); |
1853 | } |
1854 | |
1855 | // Runs in uv_thread() |
1856 | void ws_svr_client_read(vws_svr_cnx* cnx, ssize_t size, const uv_buf_t* buf) |
1857 | { |
1858 | vws_tcp_svr* server = cnx->server; |
1859 | vws_cnx* c = (vws_cnx*)cnx->data; |
1860 | |
1861 | // Add to client socket buffer |
1862 | vws_buffer_append(c->base.buffer, (ucstr)buf->base, size); |
1863 | |
1864 | // Free libuv memory |
1865 | vws.free(buf->base); |
1866 | |
1867 | // If we are in HTTP mode |
1868 | if (cnx->upgraded == false) |
1869 | { |
1870 | // Parse incoming data as HTTP request. |
1871 | |
1872 | ucstr data = c->base.buffer->data; |
1873 | size_t size = c->base.buffer->size; |
1874 | ssize_t n = vws_http_msg_parse(cnx->http, (cstr)data, size); |
1875 | |
1876 | // Did we get a complete request? |
1877 | if (cnx->http->headers_complete == true) |
1878 | { |
1879 | // Check for parsing errors |
1880 | enum llhttp_errno err = llhttp_get_errno(cnx->http->parser); |
1881 | |
1882 | // If there was a parsing error, close connection |
1883 | if(err != HPE_PAUSED) |
1884 | { |
1885 | vws.error( VE_RT, "Error: %s (%s)" , |
1886 | llhttp_errno_name(err), |
1887 | llhttp_get_error_reason(cnx->http->parser) ); |
1888 | |
1889 | // Close connection |
1890 | svr_cnx_close(server, cnx->cid); |
1891 | |
1892 | return; |
1893 | } |
1894 | |
1895 | // Drain HTTP request data from cnx->data buffer |
1896 | vws_buffer_drain(c->base.buffer, n); |
1897 | |
1898 | //> Generate HTTP response and send |
1899 | |
1900 | vws_buffer* http = vws_buffer_new(); |
1901 | |
1902 | struct sc_map_str* = &cnx->http->headers; |
1903 | cstr key = vws_map_get(headers, "sec-websocket-key" ); |
1904 | cstr proto = vws_map_get(headers, "sec-websocket-protocol" ); |
1905 | |
1906 | vws_buffer_printf(http, "HTTP/1.1 101 Switching Protocols\r\n" ); |
1907 | vws_buffer_printf(http, "Upgrade: websocket\r\n" ); |
1908 | vws_buffer_printf(http, "Connection: Upgrade\r\n" ); |
1909 | |
1910 | cstr ac = vws_accept_key(key); |
1911 | vws_buffer_printf(http, "Sec-WebSocket-Accept: %s\r\n" , ac); |
1912 | vws.free(ac); |
1913 | |
1914 | vws_buffer_printf(http, "Sec-WebSocket-Version: 13\r\n" ); |
1915 | vws_buffer_printf(http, "Sec-WebSocket-Protocol: " ); |
1916 | |
1917 | if (proto != NULL) |
1918 | { |
1919 | vws_buffer_printf(http, "%s\r\n" , proto); |
1920 | } |
1921 | else |
1922 | { |
1923 | vws_buffer_printf(http, "vrtql\r\n" ); |
1924 | } |
1925 | |
1926 | vws_buffer_printf(http, "\r\n" ); |
1927 | |
1928 | // Package up response |
1929 | vws_svr_data* reply; |
1930 | reply = vws_svr_data_new(cnx->server, cnx->cid, &http); |
1931 | |
1932 | // Send directly out as we are in uv_thread() |
1933 | server->on_data_out(reply, NULL); |
1934 | |
1935 | // Cleanup. Buffer data was passed to reply. |
1936 | vws_buffer_free(http); |
1937 | |
1938 | //> Change state to WebSocket mode |
1939 | |
1940 | // Set the flag that we are in WebSocket mode |
1941 | cnx->upgraded = true; |
1942 | |
1943 | // Free HTTP request as we don't need it anymore |
1944 | vws_http_msg_free(cnx->http); |
1945 | cnx->http = NULL; |
1946 | |
1947 | // Do we have any data in the socket after consuming the HTTP |
1948 | // request? We shouldn't but if so this is WebSocket data. |
1949 | if (c->base.buffer->size == 0) |
1950 | { |
1951 | // No more data in the socket buffer. Done for now. |
1952 | return; |
1953 | } |
1954 | } |
1955 | else |
1956 | { |
1957 | // No complete HTTP request yet |
1958 | return; |
1959 | } |
1960 | |
1961 | // If we get here, we have a complete request and we have incoming |
1962 | // WebSocket data to process (c->base.buffer->size > 0) |
1963 | } |
1964 | |
1965 | if (vws_cnx_ingress(c) > 0) |
1966 | { |
1967 | // Process as many messages as possible |
1968 | while (true) |
1969 | { |
1970 | // Check for a complete message |
1971 | vws_msg* wsm = vws_msg_pop(c); |
1972 | |
1973 | if (wsm == NULL) |
1974 | { |
1975 | return; |
1976 | } |
1977 | |
1978 | // Pass message pointer in block |
1979 | vws_svr_data* block; |
1980 | block = vws_svr_data_own( cnx->server, |
1981 | cnx->cid, |
1982 | (ucstr)wsm, |
1983 | sizeof(vws_msg*) ); |
1984 | queue_push(&server->requests, block); |
1985 | } |
1986 | } |
1987 | } |
1988 | |
1989 | // Runs in worker_thread() |
1990 | void ws_svr_client_data_in(vws_svr_data* block, void* x) |
1991 | { |
1992 | //> Append data to connection buffer |
1993 | |
1994 | vws_cid_t cid = block->cid; |
1995 | vws_svr* server = (vws_svr*)block->server; |
1996 | |
1997 | // Data simply contains a pointer to a websocket message |
1998 | vws_msg* wsm = (vws_msg*)block->data; |
1999 | |
2000 | // Free incoming data |
2001 | block->data = NULL; |
2002 | block->size = 0; |
2003 | vws_svr_data_free(block); |
2004 | |
2005 | //> Process connection buffer data for complete messages |
2006 | server->on_msg_in(server, cid, wsm, x); |
2007 | } |
2008 | |
2009 | void ws_svr_client_data_out( vws_svr* server, |
2010 | vws_cid_t cid, |
2011 | vws_buffer* buffer, |
2012 | unsigned char opcode ) |
2013 | { |
2014 | // Create a binary websocket frame containing message |
2015 | vws_frame* frame; |
2016 | ucstr data = buffer->data; |
2017 | size_t size = buffer->size; |
2018 | frame = vws_frame_new(data, size, opcode); |
2019 | |
2020 | // This frame is from server to we don't mask it |
2021 | frame->mask = 0; |
2022 | |
2023 | // Serialize frame: frame is freed by function |
2024 | vws_buffer* fdata = vws_serialize(frame); |
2025 | |
2026 | // Pack frame binary into queue data |
2027 | vws_svr_data* response; |
2028 | |
2029 | response = vws_svr_data_new((vws_tcp_svr*)server, cid, &fdata); |
2030 | |
2031 | // Queue the data to uv_thread() to send out on wire |
2032 | vws_tcp_svr_send(response); |
2033 | |
2034 | // Free buffers |
2035 | vws_buffer_free(fdata); |
2036 | } |
2037 | |
2038 | void ws_svr_client_msg_in(vws_svr* s, vws_cid_t c, vws_msg* m, void* x) |
2039 | { |
2040 | vws_svr* server = (vws_svr*)s; |
2041 | |
2042 | // Route to application-specific processing callback |
2043 | server->process(server, c, m, x); |
2044 | } |
2045 | |
2046 | void ws_svr_client_process(vws_svr* s, vws_cid_t c, vws_msg* m, void* x) |
2047 | { |
2048 | // Default: Do nothing. Drop message. |
2049 | vws_msg_free(m); |
2050 | } |
2051 | |
2052 | void ws_svr_client_msg_out(vws_svr* s, vws_cid_t c, vws_msg* m, void* x) |
2053 | { |
2054 | ws_svr_client_data_out(s, c, m->data, m->opcode); |
2055 | vws_msg_free(m); |
2056 | } |
2057 | |
2058 | void ws_svr_ctor(vws_svr* server, int nt, int bl, int qs) |
2059 | { |
2060 | tcp_svr_ctor((vws_tcp_svr*)server, nt, bl, qs); |
2061 | |
2062 | // Server base function overrides |
2063 | server->base.on_connect = ws_svr_client_connect; |
2064 | server->base.on_disconnect = ws_svr_client_disconnect; |
2065 | server->base.on_read = ws_svr_client_read; |
2066 | server->base.on_data_in = ws_svr_client_data_in; |
2067 | |
2068 | // Message handling |
2069 | server->on_msg_in = ws_svr_client_msg_in; |
2070 | server->on_msg_out = ws_svr_client_msg_out; |
2071 | |
2072 | // Application functions |
2073 | server->process = ws_svr_client_process; |
2074 | server->send = ws_svr_client_msg_out; |
2075 | } |
2076 | |
2077 | vws_svr* vws_svr_new(int num_threads, int backlog, int queue_size) |
2078 | { |
2079 | vws_svr* server = vws.malloc(sizeof(vws_svr)); |
2080 | ws_svr_ctor(server, num_threads, backlog, queue_size); |
2081 | return server; |
2082 | } |
2083 | |
2084 | void ws_svr_dtor(vws_svr* server) |
2085 | { |
2086 | if (server == NULL) |
2087 | { |
2088 | return; |
2089 | } |
2090 | |
2091 | tcp_svr_dtor((vws_tcp_svr*)server); |
2092 | } |
2093 | |
2094 | void vws_svr_free(vws_svr* server) |
2095 | { |
2096 | if (server == NULL) |
2097 | { |
2098 | return; |
2099 | } |
2100 | |
2101 | ws_svr_dtor(server); |
2102 | vws.free(server); |
2103 | } |
2104 | |
2105 | //------------------------------------------------------------------------------ |
2106 | // Messaging Server: Derived from WebSocket server |
2107 | //------------------------------------------------------------------------------ |
2108 | |
2109 | // Convert incoming WebSocket messages to VRTQL messages for processing |
2110 | void msg_svr_client_ws_msg_in(vws_svr* s, vws_cid_t cid, vws_msg* wsm, void* x) |
2111 | { |
2112 | // Deserialize message |
2113 | |
2114 | vrtql_msg* msg = vrtql_msg_new(HTTP_REQUEST); |
2115 | ucstr data = wsm->data->data; |
2116 | size_t size = wsm->data->size; |
2117 | |
2118 | if (vrtql_msg_deserialize(msg, data, size) == false) |
2119 | { |
2120 | // Deserialized failed |
2121 | |
2122 | // Error already set |
2123 | vws_msg_free(wsm); |
2124 | vrtql_msg_free(msg); |
2125 | |
2126 | vws_tcp_svr_close((vws_tcp_svr*)s, cid); |
2127 | |
2128 | return; |
2129 | } |
2130 | |
2131 | // Deserialized succeeded |
2132 | |
2133 | // TODO |
2134 | // We send back the format we get. More than that, a request with different |
2135 | // format effectively changes the entire connection default format setting. |
2136 | // cnx->format = msg->format; |
2137 | |
2138 | // Free websocket message |
2139 | vws_msg_free(wsm); |
2140 | |
2141 | // Process message |
2142 | vrtql_msg_svr* server = (vrtql_msg_svr*)s; |
2143 | server->on_msg_in(s, cid, msg, x); |
2144 | } |
2145 | |
2146 | // Receive/handle incoming VRTQL messages |
2147 | void msg_svr_client_msg_in(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x) |
2148 | { |
2149 | vrtql_msg_svr* server = (vrtql_msg_svr*)s; |
2150 | |
2151 | // Route to application-specific processing callback |
2152 | server->process(s, c, m, x); |
2153 | } |
2154 | |
2155 | // Send VRTQL messages |
2156 | void msg_svr_client_msg_out(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x) |
2157 | { |
2158 | // Serialize message |
2159 | vws_buffer* mdata = vrtql_msg_serialize(m); |
2160 | |
2161 | // Send to base class |
2162 | ws_svr_client_data_out(s, c, mdata, BINARY_FRAME); |
2163 | |
2164 | // Cleanup |
2165 | vws_buffer_free(mdata); |
2166 | vrtql_msg_free(m); |
2167 | } |
2168 | |
2169 | // Process incoming VRTQL messages |
2170 | void msg_svr_client_process(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x) |
2171 | { |
2172 | // Default: Do nothing. Drop message. |
2173 | vrtql_msg_free(m); |
2174 | } |
2175 | |
2176 | vrtql_msg_svr* vrtql_msg_svr_new(int num_threads, int backlog, int queue_size) |
2177 | { |
2178 | vrtql_msg_svr* server = vws.malloc(sizeof(vrtql_msg_svr)); |
2179 | return vrtql_msg_svr_ctor(server, num_threads, backlog, queue_size); |
2180 | } |
2181 | |
2182 | void vrtql_msg_svr_free(vrtql_msg_svr* server) |
2183 | { |
2184 | vrtql_msg_svr_dtor(server); |
2185 | } |
2186 | |
2187 | vrtql_msg_svr* |
2188 | vrtql_msg_svr_ctor(vrtql_msg_svr* server, int threads, int backlog, int qsize) |
2189 | { |
2190 | ws_svr_ctor((vws_svr*)server, threads, backlog, qsize); |
2191 | |
2192 | // Server base function overrides |
2193 | server->base.process = msg_svr_client_ws_msg_in; |
2194 | |
2195 | // Message handling |
2196 | server->on_msg_in = msg_svr_client_msg_in; |
2197 | server->on_msg_out = msg_svr_client_msg_out; |
2198 | |
2199 | // Application functions |
2200 | server->process = msg_svr_client_process; |
2201 | server->send = msg_svr_client_msg_out; |
2202 | |
2203 | // User-defined data |
2204 | server->data = NULL; |
2205 | |
2206 | return server; |
2207 | } |
2208 | |
2209 | void vrtql_msg_svr_dtor(vrtql_msg_svr* server) |
2210 | { |
2211 | if (server == NULL) |
2212 | { |
2213 | return; |
2214 | } |
2215 | |
2216 | ws_svr_dtor((vws_svr*)server); |
2217 | vws.free(server); |
2218 | } |
2219 | |