1#if defined(__linux__) || defined(__bsd__) || defined(__sunos__)
2#include <unistd.h>
3#endif
4
5#include <stdbool.h>
6#include <stdio.h>
7#include <stdlib.h>
8#include <string.h>
9
10#include "server.h"
11#include "websocket.h"
12
13//------------------------------------------------------------------------------
14// Internal functions
15//------------------------------------------------------------------------------
16
17/**
18 * @defgroup AddressPool
19 */
20
21address_pool* address_pool_new(int initial_size, int growth_factor)
22{
23 address_pool* pool = (address_pool*)malloc(sizeof(address_pool));
24 pool->slots = (uintptr_t*)calloc(initial_size, sizeof(uintptr_t));
25 pool->capacity = initial_size;
26 pool->count = 0;
27 pool->last_used_index = 0;
28 pool->growth_factor = growth_factor;
29
30 return pool;
31}
32
33void address_pool_free(address_pool** pool)
34{
35 if (*pool != NULL)
36 {
37 free((*pool)->slots);
38 free(*pool);
39 (*pool) = NULL;
40 }
41}
42
43void address_pool_resize(address_pool *pool)
44{
45 int new_capacity = pool->capacity * pool->growth_factor;
46 uintptr_t* new_slots = (uintptr_t *)calloc(new_capacity, sizeof(uintptr_t));
47 memcpy(new_slots, pool->slots, pool->capacity * sizeof(uintptr_t));
48
49 free(pool->slots);
50
51 pool->slots = new_slots;
52 pool->capacity = new_capacity;
53}
54
55uint32_t address_pool_set(address_pool* pool, uintptr_t address)
56{
57 if (pool->count == pool->capacity)
58 {
59 address_pool_resize(pool);
60 }
61
62 while (pool->slots[pool->last_used_index] != 0)
63 {
64 pool->last_used_index = (pool->last_used_index + 1) % pool->capacity;
65 }
66
67 // Placeholder for actual address or ID
68 pool->slots[pool->last_used_index] = address;
69 pool->count++;
70
71 uint32_t allocated_index = pool->last_used_index;
72 pool->last_used_index = (pool->last_used_index + 1) % pool->capacity;
73
74 return allocated_index;
75}
76
77uintptr_t address_pool_get(address_pool* pool, vws_cid_t index)
78{
79 if (index >= pool->capacity || pool->slots[index] == 0)
80 {
81 return 0;
82 }
83
84 return pool->slots[index];
85}
86
87void address_pool_remove(address_pool* pool, uint32_t index)
88{
89 if (pool->slots[index] != 0)
90 {
91 pool->slots[index] = 0;
92 pool->count--;
93 }
94}
95
96/**
97 * @defgroup ThreadFunctions
98 *
99 * @brief Program thread organization
100 *
101 * There are two thread functions: the network thread (main thread) and work
102 * pool threads.
103 */
104
105/**
106 * @brief UV callback executed in response to (server->wakeup) async signal.
107 *
108 * This function handles asynchronous events in the main thread, specifically
109 * managing outgoing network I/O from worker threads back to clients. It runs
110 * within the main UV loop, within the main thread, also referred to as the
111 * networking thread.
112 *
113 * Worker threads pass data to it through a queue. The data is in the form of
114 * vws_svr_data instances. When a worker sends a vws_svr_data instance, it
115 * adds it to the queue (server->responses) and notifies (wakes up) the main UV
116 * loop (uv_run() in vrtql_tcp_svr_run()) by calling uv_async_send(server->wakeup)
117 * which in turn calls this function to check the server->responses queue. This
118 * function unloads all data in the queue and sends the data out to each
119 * respective client. It then returns control back to the main UV loop which
120 * resumes polling the network connections (blocking if there is no activity).
121 *
122 * @param handle A pointer to the uv_async_t handle that triggered the callback.
123 *
124 * @ingroup ThreadFunctions
125 */
126static void uv_thread(uv_async_t* handle);
127
128/**
129 * @brief The entry point for a worker thread.
130 *
131 * This function implements the worker thread pool. It is what each worker
132 * thread runs. It loops continuously, handling incoming data from clients,
133 * processing them and returning data back to them via the uv_thread(). It
134 * processes data by taking data (requests) from the server->requests queue,
135 * dispatching them to server->process(data) for processing, which in turn
136 * generates data (responses), sending them back to the client by putting them
137 * on the server->responses queue (processed by uv_thread()).
138 *
139 * @param arg A void pointer, typically to the server object,
140 * used to pass data into the thread.
141 *
142 * @ingroup ThreadFunctions
143 */
144static void worker_thread(void* arg);
145
146/**
147 * @defgroup ServerFunctions
148 *
149 * @brief Functions that support server operation
150 *
151 */
152
153/**
154 * @brief Server instance constructor
155 *
156 * Constructs a new server instance. This takes a new, empty vws_tcp_svr instance
157 * and initializes all of its members. It is used by derived structs as well
158 * (vrtql_msg_svr) to construct the base struct.
159 *
160 * @param server The server instance to be initialized
161 * @return The initialized server instance
162 *
163 * @ingroup ServerFunctions
164 */
165
166static vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* s, int nt, int bl, int qs);
167
168/**
169 * @brief Server instance destructor
170 *
171 * Destructs an initialized server instance. This takes a vws_tcp_svr instance
172 * and deallocates all of its members -- everything but the top-level
173 * struct. This is used by derived structs as well (vrtql_msg_svr) to destruct
174 * the base struct.
175 *
176 * @param server The server instance to be destructed
177 *
178 * @ingroup ServerFunctions
179 */
180
181static void tcp_svr_dtor(vws_tcp_svr* s);
182
183/**
184 * @brief Initiates the server shutdown process.
185 *
186 * This function is responsible for shutting down a vws_tcp_svr server instance.
187 * It stops the server if it's currently running, performs necessary cleanup,
188 * shuts down the libuv event loop, frees memory, and clears the connection map.
189 * It signals all worker threads to stop processing new data and to finish any
190 * requests they are currently processing.
191 *
192 * @param server The server that needs to be shutdown.
193 *
194 * @ingroup ServerFunctions
195 */
196static void svr_shutdown(vws_tcp_svr* server);
197
198/**
199 * @brief Handles the close event for a libuv handle.
200 *
201 * This function is called when a libuv handle is closed. It checks if the handle
202 * has associated heap data stored in `handle->data` and generates a warning if
203 * the resource was not properly freed.
204 *
205 * @param handle The libuv handle being closed.
206 */
207static void on_uv_close(uv_handle_t* handle);
208
209/**
210 * @brief Walks through libuv handles and attempts to close them.
211 *
212 * This function is used to walk through libuv handles and attempts to close each
213 * handle. It is typically called during libuv shutdown to ensure that all handles
214 * are properly closed. If a handle has not been closed, it will be closed and
215 * the `on_uv_close()` function will be called.
216 *
217 * @param handle The libuv handle being walked.
218 * @param arg Optional argument passed to the function (not used in this case).
219 */
220static void on_uv_walk(uv_handle_t* handle, void* arg);
221
222/**
223 * @brief Callback for new client connection.
224 *
225 * This function is invoked when a new client successfully connects to the
226 * server.
227 *
228 * @param server The server to which the client connected.
229 * @param status The status of the connection.
230 *
231 * @ingroup ServerFunctions
232 */
233static void svr_on_connect(uv_stream_t* server, int status);
234
235/**
236 * @brief Callback for buffer reallocation.
237 *
238 * This function is invoked when a handle requires a buffer reallocation.
239 *
240 * @param handle The handle requiring reallocation.
241 * @param size The size of the buffer to be allocated.
242 * @param buf The buffer.
243 *
244 * @ingroup ServerFunctions
245 */
246static void svr_on_realloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
247
248/**
249 * @brief Callback for handle closure.
250 *
251 * This function is invoked when a handle is closed.
252 *
253 * @param handle The handle that was closed.
254 *
255 * @ingroup ServerFunctions
256
257 */
258static void svr_on_close(uv_handle_t* handle);
259
260/**
261 * @brief Callback for reading data.
262 *
263 * This function is invoked when data is read from a client.
264 *
265 * @param client The client from which data was read.
266 * @param size The number of bytes read.
267 * @param buf The buffer containing the data.
268 *
269 * @ingroup ServerFunctions
270 */
271static void svr_on_read(uv_stream_t* client, ssize_t size, const uv_buf_t* buf);
272
273/**
274 * @brief Callback for write completion.
275 *
276 * This function is invoked when a write operation to a client completes.
277 *
278 * @param req The write request.
279 * @param status The status of the write operation.
280 *
281 * @ingroup ServerFunctions
282 */
283static void svr_on_write_complete(uv_write_t* req, int status);
284
285/**
286 * @defgroup Connection Functions
287 *
288 * @brief Functions that support connection operation
289 *
290 * Connections always refer to client-side connections, on the server. They are
291 * active connections established within the main UV loop.
292 *
293 * @ingroup ServerFunctions
294 */
295
296/**
297 * @brief Creates a new server connection.
298 *
299 * @param s The server for the connection.
300 * @param c The client for the connection.
301 * @return A new server connection.
302 *
303 * @ingroup ServerFunctions
304 */
305static vws_svr_cnx* svr_cnx_new(vws_tcp_svr* s, uv_stream_t* c);
306
307/**
308 * @brief Frees a server connection.
309 *
310 * @param c The connection to be freed.
311 *
312 * @ingroup ServerFunctions
313 */
314static void svr_cnx_free(vws_svr_cnx* c);
315
316/**
317 * @brief Actively close a client connection
318 *
319 * @param c The connection
320 */
321static void svr_cnx_close(vws_tcp_svr* server, vws_cid_t c);
322
323/**
324 * @brief Callback for client connection.
325 *
326 * This function is triggered when a new client connection is established. It
327 * is responsible for processing any steps necessary at the start of a
328 * connection.
329 *
330 * @param c The connection structure representing the client that has connected.
331 *
332 * @ingroup ServerFunctions
333 */
334static void svr_client_connect(vws_svr_cnx* c);
335
336/**
337 * @brief Callback for client disconnection.
338 *
339 * This function is triggered when a client connection is terminated. It is
340 * responsible for processing any cleanup or other steps necessary at the end of
341 * a connection.
342 *
343 * @param c The connection
344 *
345 * @ingroup ServerFunctions
346 */
347static void svr_client_disconnect(vws_svr_cnx* c);
348
349/**
350 * @brief Callback for client read operations.
351 *
352 * This function is triggered when data is read from a client connection. It is
353 * responsible for processing the received data.
354 *
355 * @param c The connection that sent the data.
356 * @param size The size of the data that was read.
357 * @param buf The buffer containing the data that was read.
358 *
359 * @ingroup ServerFunctions
360 */
361static void svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf);
362
363/**
364 * @brief Callback for processing client data in (ingress)
365 *
366 * This function processes data arriving from client to worker thread. It is
367 * responsible for handling the actual computation or other work associated with
368 * the data. This takes place in the context of worker_thread().
369 *
370 * @param data The incoming data from the client to process.
371 *
372 * @ingroup ServerFunctions
373 */
374static void svr_client_data_in(vws_svr_data* data, void* x);
375
376/**
377 * @brief Callback for processing client data in (ingress)
378 *
379 * This function is triggered to process data arriving from worker thread to be
380 * send back to client. It is responsible for transferring the data from the
381 * response argument (vws_svr_data) onto the wire (client socket via libuv),
382 * actual computation or other work associated with the data. This takes place
383 * in the context of uv_thread().
384 *
385 * @param data The outgoing data from worker to client
386 *
387 * @ingroup ServerFunctions
388 */
389static void svr_client_data_out(vws_svr_data* data, void* x);
390
391
392
393
394/**
395 * @defgroup WebSocketServerFunctions
396 *
397 * @brief Functions that support message server operation
398 *
399 */
400
401/**
402 * @brief WebSocket server constructor.
403 *
404 * @param server The WebSocket server instance.
405 * @param nt The number of threads to run in the worker pool.
406 * @param bl The connection backlog for listen(). If set to 0, it uses the
407 * default value (128).
408 * @param qs The maximum queue size for requests and responses. If set to 0, it
409 * uses the default value (1024).
410 */
411static void ws_svr_ctor(vws_svr* server, int nt, int bl, int qs);
412
413/**
414 * @brief WebSocket server destructor.
415 *
416 * @param server The WebSocket server instance to free.
417 */
418static void ws_svr_dtor(vws_svr* server);
419
420/**
421 * @brief Callback for client connection.
422 *
423 * This function is triggered when a new client connection is established. It
424 * is responsible for processing any steps necessary at the start of a
425 * connection.
426 *
427 * @param c The connection structure representing the client that has connected.
428 *
429 * @ingroup WebSocketServerFunctions
430 */
431static void ws_svr_client_connect(vws_svr_cnx* c);
432
433/**
434 * @brief Callback for client disconnection.
435 *
436 * This function is triggered when a client connection is terminated. It is
437 * responsible for processing any cleanup or other steps necessary at the end of
438 * a connection.
439 *
440 * @param c The connection
441 *
442 * @ingroup WebSocketServerFunctions
443 */
444static void ws_svr_client_disconnect(vws_svr_cnx* c);
445
446/**
447 * @brief Callback for client read operations.
448 *
449 * This function is triggered when data is read from a client connection. It is
450 * responsible for processing the received data.
451 *
452 * @param c The connection that sent the data.
453 * @param size The size of the data that was read.
454 * @param buf The buffer containing the data that was read.
455 *
456 * @ingroup WebSocketServerFunctions
457 */
458static void ws_svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf);
459
460/**
461 * @brief Callback for processing client data in (ingress) for msg server
462 *
463 * This function processes data arriving from client to worker thread. It
464 * collects data until there is a complete message. It passes message to
465 * ws_svr_client_msg_in() for processing. This takes place in the context of
466 * worker_thread().
467 *
468 * @param server The server instance
469 * @param data The incoming data from the client to process.
470 * @param x The user-defined context
471 *
472 * @ingroup WebSocketServerFunctions
473 */
474static void ws_svr_client_data_in(vws_svr_data* data, void* x);
475
476/**
477 * @brief Sends data from a server connection to a client WebSocket connection.
478 *
479 * @param server The server
480 * @param c The connection index
481 * @param buffer The data to send.
482 * @param opcode The opcode for the WebSocket frame.
483 */
484static void ws_svr_client_data_out( vws_svr* server,
485 vws_cid_t c,
486 vws_buffer* buffer,
487 unsigned char opcode);
488
489/**
490 * @brief Process a WebSocket frame received from a client.
491 *
492 * @param c The WebSocket connection.
493 * @param f The incoming frame to process.
494 */
495static void ws_svr_process_frame(vws_cnx* c, vws_frame* f);
496
497/**
498 * @brief Callback for client message processing
499 *
500 * This function is triggered when a message is read from a client
501 * connection. It is responsible for processing the message. This takes place in
502 * the context of worker_thread().
503 *
504 * @param s The server
505 * @param c The connection ID
506 * @param m The message to process
507 * @param x The user-defined context
508 *
509 * @ingroup WebSocketServerFunctions
510 */
511static void ws_svr_client_msg_in(vws_svr* s, vws_cid_t cid, vws_msg* m, void* x);
512
513/**
514 * @brief Callback for sending message to client. It takes a message as input,
515 * serializes it to a binary WebSocket message and then sends it to the
516 * uv_thread() to send back to client. This takes place in the context of
517 * worker_thread() (only to be called within that context).
518 *
519 * This function sends a message back to a client.
520 *
521 * @param s The server
522 * @param c The connection ID
523 * @param m The message to send
524 * @param x The user-defined context
525 *
526 * @ingroup WebSocketServerFunctions
527 */
528static void ws_svr_client_msg_out(vws_svr* s, vws_cid_t c, vws_msg* m, void* x);
529
530/**
531 * @brief Default WebSocket message processing function. This is mean to be
532 * overriden by application to perform message processing, specifically by
533 * assigning the vws_svr.process callback to the desired processing
534 * handler. The default implementation simple drops (deletes) the incoming
535 * message.
536 *
537 * @param s The server instance.
538 * @param c The server connection.
539 * @param m The incoming WebSocket message.
540 * @param x The user-defined context
541 *
542 * @ingroup MessageServerFunctions
543 */
544static void ws_svr_client_process(vws_svr* s, vws_cid_t c, vws_msg* m, void* x);
545
546
547
548
549/**
550 * @defgroup MessageServerFunctions
551 *
552 * @brief Functions that support message server operation
553 *
554 */
555
556/**
557 * @brief Convert incoming WebSocket messages to VRTQL messages for processing.
558 *
559 * @param s The server instance
560 * @param c The connection ID
561 * @param m The incoming WebSocket message.
562 * @param x The user-defined context
563 *
564 * @ingroup MessageServerFunctions
565 */
566static void msg_svr_client_ws_msg_in(vws_svr* s, vws_cid_t c, vws_msg* m, void* x);
567
568/**
569 * @brief Callback function for handling incoming VRTQL messages from a client.
570 *
571 * @param s The server instance
572 * @param c The connection ID
573 * @param m The incoming VRTQL message.
574 * @param x The user-defined context
575 *
576 * @ingroup MessageServerFunctions
577 */
578static void msg_svr_client_msg_in(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x);
579
580/**
581 * @brief Callback function for sending VRTQL messages to a client.
582 *
583 * @param s The server instance
584 * @param c The server connection.
585 * @param m The outgoing VRTQL message.
586 * @param x The user-defined context
587 *
588 * @ingroup MessageServerFunctions
589 */
590static void msg_svr_client_msg_out(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x);
591
592/**
593 * @brief Default VRTQL message processing function.
594 *
595 * @param s The server instance
596 * @param c The server connection.
597 * @param m The incoming VRTQL message.
598 * @param x The user-defined context
599 *
600 * @ingroup MessageServerFunctions
601 */
602static void msg_svr_client_process(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x);
603
604
605
606
607/**
608 * @defgroup ConnectionMap
609 *
610 * @brief Functions that provide access to the connection map
611 *
612 * The connection map tracks all active connections. These functions simplify
613 * the map API and include memory management and other server-specific
614 * functionality where appropriate.
615 *
616 * @ingroup ConnectionMap
617 */
618
619/**
620 * @brief Get the connection corresponding to a client from the connection map.
621 *
622 * @param map The connection map.
623 * @param key The client (used as key in the map).
624 * @return The corresponding server connection.
625 *
626 * @ingroup ConnectionMap
627 */
628static vws_svr_cnx* svr_cnx_map_get(vws_svr_cnx_map* map, uv_stream_t* key);
629
630/**
631 * @brief Set a connection for a client in the connection map.
632 *
633 * @param map The connection map.
634 * @param key The client (used as key in the map).
635 * @param value The server connection.
636 * @return Success or failure status.
637 *
638 * @ingroup ConnectionMap
639 */
640static int8_t svr_cnx_map_set( vws_svr_cnx_map* map,
641 uv_stream_t* key,
642 vws_svr_cnx* value );
643
644/**
645 * @brief Remove a client's connection from the connection map.
646 *
647 * @param map The connection map.
648 * @param key The client (used as key in the map).
649 *
650 * @ingroup ConnectionMap
651 */
652static void svr_cnx_map_remove(vws_svr_cnx_map* map, uv_stream_t* key);
653
654/**
655 * @brief Clear all connections from the connection map.
656 *
657 * @param map The connection map.
658 *
659 * @ingroup ConnectionMap
660 */
661static void svr_cnx_map_clear(vws_svr_cnx_map* map);
662
663/**
664 * @defgroup QueueGroup
665 *
666 * @brief Queue functions which bridge the network thread and workers
667 *
668 * The network thread and worker threads pass data via queues. These queues
669 * contain vws_svr_data instances. There is a requests queue which passes
670 * data from the network thread to workers for processing. There is a response
671 * queue that passed data from worker queues to the network thread. When passed
672 * from network thread to work, data take form of incoming data from the
673 * client. When passed from the worker threads the the networking thread, data
674 * take the form of outgoing data to be sent back to the client.
675 *
676 * Queues have built-in synchronization mechanisms that allow the data to be
677 * safely passed between threads, as well as ways to gracefull put waiting
678 * threads to sleep until data arrives for processing.
679 */
680
681/**
682 * @brief Initializes a server queue.
683 *
684 * This function sets up the provided queue with the given capacity. It also
685 * initializes the synchronization mechanisms associated with the queue.
686 *
687 * @param queue Pointer to the server queue to be initialized.
688 * @param capacity The maximum capacity of the queue.
689 * @param name The queue name
690 *
691 * @ingroup QueueGroup
692 */
693static void queue_init(vws_svr_queue* queue, int capacity, cstr name);
694
695/**
696 * @brief Destroys a server queue.
697 *
698 * This function cleans up the resources associated with the provided queue.
699 * It also handles the synchronization mechanisms associated with the queue.
700 *
701 * @param queue Pointer to the server queue to be destroyed.
702 *
703 * @ingroup QueueGroup
704 */
705static void queue_destroy(vws_svr_queue* queue);
706
707/**
708 * @brief Pushes data to the server queue.
709 *
710 * This function adds data to the end of the provided queue. It also handles
711 * the necessary synchronization to ensure thread safety.
712 *
713 * @param queue Pointer to the server queue.
714 * @param data Data to be added to the queue.
715 *
716 * @ingroup QueueGroup
717 */
718static void queue_push(vws_svr_queue* queue, vws_svr_data* data);
719
720/**
721 * @brief Pops data from the server queue.
722 *
723 * This function removes and returns data from the front of the provided queue.
724 * It also handles the necessary synchronization to ensure thread safety.
725 *
726 * @param queue Pointer to the server queue.
727 * @return A data element from the front of the queue.
728 *
729 * @ingroup QueueGroup
730 */
731static vws_svr_data* queue_pop(vws_svr_queue* queue);
732
733/**
734 * @brief Checks if the server queue is empty.
735 *
736 * This function checks whether the provided queue is empty.
737 * It also handles the necessary synchronization to ensure thread safety.
738 *
739 * @param queue Pointer to the server queue.
740 * @return True if the queue is empty, false otherwise.
741 *
742 * @ingroup QueueGroup
743 */
744static bool queue_empty(vws_svr_queue* queue);
745
746//------------------------------------------------------------------------------
747// Threads
748//------------------------------------------------------------------------------
749
750void worker_thread(void* arg)
751{
752 vws_tcp_svr* server = (vws_tcp_svr*)arg;
753
754 // Set thread tracing level to server.
755 vws.tracelevel = server->trace;
756
757 if (vws.tracelevel >= VT_THREAD)
758 {
759 vws.trace(VL_INFO, "worker_thread(): Starting");
760 }
761
762 vws_thread_ctx ctx;
763 ctx.ctor = server->worker_ctor;
764 ctx.ctor_data = server->worker_ctor_data;
765 ctx.dtor = server->worker_dtor;
766 ctx.data = NULL;
767
768 if (ctx.ctor != NULL)
769 {
770 ctx.data = ctx.ctor(ctx.ctor_data);
771 }
772
773 while (true)
774 {
775 //> Wait for arrival
776
777 // This will put the thread to sleep on a condition variable until
778 // something arrives in queue.
779 vws_svr_data* request = queue_pop(&server->requests);
780
781 // If there's no request (null request), check the server's state
782 if (request == NULL)
783 {
784 // If server is in halting state, return
785 if (server->state == VS_HALTING)
786 {
787 if (vws.tracelevel >= VT_THREAD)
788 {
789 vws.trace(VL_INFO, "worker_thread(): Exiting");
790 }
791
792 return;
793 }
794 else
795 {
796 // If not halting, skip to the next iteration of the loop
797 continue;
798 }
799 }
800
801 server->on_data_in(request, ctx.data);
802 }
803
804 if (ctx.dtor != NULL)
805 {
806 ctx.dtor(ctx.data);
807 }
808}
809
810void uv_thread(uv_async_t* handle)
811{
812 vws_cinfo* cinfo = (vws_cinfo*)handle->data;
813 vws_tcp_svr* server = cinfo->server;
814
815 if (server->state == VS_HALTING)
816 {
817 if (vws.tracelevel >= VT_THREAD)
818 {
819 vws.trace(VL_INFO, "uv_thread(): stop");
820 }
821
822 // Join worker threads. Worker threads must all exit before we can
823 // shutdown libuv as they must release their mutexes and condition
824 // variables first, otherwise uv_stop() will not actually stop the loop.
825 for (int i = 0; i < server->pool_size; i++)
826 {
827 uv_thread_join(&server->threads[i]);
828 }
829
830 svr_shutdown(server);
831
832 return;
833 }
834
835 while (queue_empty(&server->responses) == false)
836 {
837 vws_svr_data* data = queue_pop(&server->responses);
838
839 if ((data == NULL) || (server->responses.state != VS_RUNNING))
840 {
841 if (data != NULL)
842 {
843 vws_svr_data_free(data);
844 }
845
846 return;
847 }
848
849 if (vws_is_flag(&data->flags, VM_SVR_DATA_CLOSE))
850 {
851 // Close connection
852 svr_cnx_close(server, data->cid);
853 vws_svr_data_free(data);
854 }
855 else
856 {
857 server->on_data_out(data, NULL);
858 }
859 }
860}
861
862//------------------------------------------------------------------------------
863// Server API
864//------------------------------------------------------------------------------
865
866vws_svr_data* vws_svr_data_new(vws_tcp_svr* s, vws_cid_t cid, vws_buffer** b)
867{
868 // Create a new vws_svr_data taking ownership of the buffer's data
869 vws_svr_data* item = vws_svr_data_own(s, cid, (*b)->data, (*b)->size);
870
871 // Since we take ownership of buffer data, we clear the buffer.
872 (*b)->data = NULL;
873 (*b)->size = 0;
874
875 return item;
876}
877
878vws_svr_data* vws_svr_data_own(vws_tcp_svr* s, vws_cid_t cid, ucstr data, size_t size)
879{
880 vws_svr_data* item;
881 item = (vws_svr_data*)vws.malloc(sizeof(vws_svr_data));
882
883 item->server = s;
884 item->cid = cid;
885 item->size = size;
886 item->data = data;
887 item->flags = 0;
888
889 return item;
890}
891
892void vws_svr_data_free(vws_svr_data* t)
893{
894 if (t != NULL)
895 {
896 vws.free(t->data);
897 vws.free(t);
898 }
899}
900
901vws_tcp_svr* vws_tcp_svr_new(int num_threads, int backlog, int queue_size)
902{
903 vws_tcp_svr* server = vws.malloc(sizeof(vws_tcp_svr));
904 return tcp_svr_ctor(server, num_threads, backlog, queue_size);
905}
906
907uint8_t vws_tcp_svr_state(vws_tcp_svr* server)
908{
909 return server->state;
910}
911
912void vws_tcp_svr_free(vws_tcp_svr* server)
913{
914 if (server == NULL)
915 {
916 return;
917 }
918
919 tcp_svr_dtor(server);
920 vws.free(server);
921}
922
923int vws_tcp_svr_send(vws_svr_data* data)
924{
925 queue_push(&data->server->responses, data);
926
927 // Notify event loop about the new response
928 uv_async_send(data->server->wakeup);
929
930 return 0;
931}
932
933int vws_tcp_svr_run(vws_tcp_svr* server, cstr host, int port)
934{
935 if (vws.tracelevel >= VT_SERVICE)
936 {
937 vws.trace( VL_INFO,
938 "vws_tcp_svr_run(%p): Starting worker %i threads",
939 server,
940 server->pool_size );
941 }
942
943 for (int i = 0; i < server->pool_size; i++)
944 {
945 uv_thread_create(&server->threads[i], worker_thread, server);
946 }
947
948 //> Create listening socket
949
950 uv_tcp_t* socket = vws.malloc(sizeof(uv_tcp_t));
951
952 uv_tcp_init(server->loop, socket);
953
954 vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo));
955 cinfo->cnx = NULL;
956 cinfo->server = server;
957 cinfo->cid = 0;
958
959 socket->data = cinfo;
960
961 //> Bind to address
962
963 int rc;
964 struct sockaddr_in addr;
965 uv_ip4_addr(host, port, &addr);
966 rc = uv_tcp_bind(socket, (const struct sockaddr*)&addr, 0);
967
968 if (vws.tracelevel >= VT_SERVICE)
969 {
970 vws.trace( VL_INFO,
971 "vws_tcp_svr_run(%p): Bind %s:%lu",
972 server, host, port );
973 }
974
975 if (rc)
976 {
977 vws.error(VE_RT, "Bind error %s", uv_strerror(rc));
978 return -1;
979 }
980
981 //> Listen
982
983 rc = uv_listen((uv_stream_t*)socket, server->backlog, svr_on_connect);
984
985 if (rc)
986 {
987 vws.error(VE_RT, "Listen error %s", uv_strerror(rc));
988 return -1;
989 }
990
991 if (vws.tracelevel >= VT_SERVICE)
992 {
993 vws.trace( VL_INFO,
994 "vws_tcp_svr_run(%s): Listen %s:%lu",
995 server, host, port );
996 }
997
998 //> Start server
999
1000 // Set state to running
1001 server->state = VS_RUNNING;
1002
1003 if (vws.tracelevel >= VT_SERVICE)
1004 {
1005 vws.trace(VL_INFO, "vws_tcp_svr_run(%s): Starting uv_run()", server);
1006 }
1007
1008 while (server->state == VS_RUNNING)
1009 {
1010 // Run UV loop. This runs indefinitely, passing network I/O and and out
1011 // of system until server is shutdown by vws_tcp_svr_stop() (by external
1012 // thread).
1013 uv_run(server->loop, UV_RUN_DEFAULT);
1014 }
1015
1016 //> Shutdown server
1017
1018 // Close the listening socket handle
1019 uv_close((uv_handle_t*)socket, svr_on_close);
1020
1021 if (vws.tracelevel >= VT_SERVICE)
1022 {
1023 vws.trace(VL_INFO, "vws_tcp_svr_run(%p): Shutdown complete", server);
1024 }
1025
1026 // Set state to halted
1027 server->state = VS_HALTED;
1028
1029 return 0;
1030}
1031
1032void vws_tcp_svr_stop(vws_tcp_svr* server)
1033{
1034 // Set shutdown flags
1035 server->state = VS_HALTING;
1036 server->requests.state = VS_HALTING;
1037 server->responses.state = VS_HALTING;
1038
1039 // Wakeup all worker threads
1040 if (vws.tracelevel >= VT_SERVICE)
1041 {
1042 vws.trace(VL_INFO, "vws_tcp_svr_stop(): stop worker threads");
1043 }
1044
1045 uv_mutex_lock(&server->requests.mutex);
1046 uv_cond_broadcast(&server->requests.cond);
1047 uv_mutex_unlock(&server->requests.mutex);
1048
1049 // Wakeup the main event loop to shutdown main thread
1050 if (vws.tracelevel >= VT_SERVICE)
1051 {
1052 vws.trace(VL_INFO, "vws_tcp_svr_stop(): stop main thread");
1053 }
1054
1055 uv_async_send(server->wakeup);
1056
1057 while (server->state != VS_HALTED)
1058 {
1059 sleep(1);
1060 }
1061}
1062
1063int vws_tcp_svr_inetd_run(vws_tcp_svr* server, int sockfd)
1064{
1065 if (sockfd < 0)
1066 {
1067 vws.error(VE_RT, "Invalid server or socket descriptor provided");
1068 return 1;
1069 }
1070
1071 if (vws.tracelevel >= VT_SERVICE)
1072 {
1073 vws.trace( VL_INFO,
1074 "vws_tcp_svr_run(%p): Starting worker %i threads",
1075 server,
1076 server->pool_size );
1077 }
1078
1079 for (int i = 0; i < server->pool_size; i++)
1080 {
1081 uv_thread_create(&server->threads[i], worker_thread, server);
1082 }
1083
1084 // Go into non-blocking mode as we are using poll() for socket_read() and
1085 // socket_write().
1086 if (vws_socket_set_nonblocking(sockfd) == false)
1087 {
1088 vws.error(VE_RT, "Failed to set socket to nonblocking");
1089 return 1;
1090 }
1091
1092 // Set to inetd mode
1093 server->inetd_mode = 1;
1094
1095 // Initialize and adopt the existing socket descriptor.
1096 uv_tcp_t* c = (uv_tcp_t*)vws.malloc(sizeof(uv_tcp_t));
1097
1098 if (uv_tcp_init(server->loop, c))
1099 {
1100 // Handle uv_tcp_init failure.
1101 vws.error(VE_RT, "Failed to initialize new TCP handle");
1102 vws.free(c);
1103 return 1;
1104 }
1105
1106 if (uv_tcp_open(c, sockfd))
1107 {
1108 // Handle uv_tcp_open failure.
1109 vws.error(VE_RT, "Failed to adopt the socket descriptor.");
1110 vws.free(c);
1111 return 1;
1112 }
1113
1114 vws_cinfo* addr = vws.malloc(sizeof(vws_cinfo));
1115 addr->cnx = NULL;
1116 addr->server = server;
1117
1118 c->data = addr;
1119
1120 if (uv_read_start((uv_stream_t*)c, svr_on_realloc, svr_on_read) != 0)
1121 {
1122 vws.error(VE_RT, "Failed to start reading from client");
1123 vws.free(c);
1124 vws.free(addr);
1125 return 1;
1126 }
1127
1128 //> Add connection to registry and initialize
1129
1130 // Associate server with this handle for callbacks.
1131 vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)c);
1132 addr->cnx = cnx;
1133 addr->cid = cnx->cid;
1134
1135 //> Call svr_on_connect() handler
1136
1137 server->on_connect(cnx);
1138
1139 // Now, the handle is associated with the socket and is ready to be used.
1140 // Start the libuv loop.
1141 uv_run(server->loop, UV_RUN_DEFAULT);
1142
1143 return 0;
1144}
1145
1146void vws_tcp_svr_inetd_stop(vws_tcp_svr* server)
1147{
1148 // Set shutdown flags
1149 server->state = VS_HALTING;
1150 server->requests.state = VS_HALTING;
1151 server->responses.state = VS_HALTING;
1152
1153 // Stop the loop. We have not more I/O to deal with. We don't want the loop
1154 // to run any more for any reason.
1155 uv_stop(server->loop);
1156
1157 // Wakeup all worker threads
1158 if (vws.tracelevel >= VT_SERVICE)
1159 {
1160 vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): stop worker threads");
1161 }
1162
1163 uv_mutex_lock(&server->requests.mutex);
1164 uv_cond_broadcast(&server->requests.cond);
1165 uv_mutex_unlock(&server->requests.mutex);
1166
1167 // Wakeup the main event loop to shutdown main thread
1168 if (vws.tracelevel >= VT_SERVICE)
1169 {
1170 vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): stop main thread");
1171 }
1172
1173 // Wait for all threads to complete
1174 uv_thread(server->wakeup);
1175
1176 if (vws.tracelevel >= VT_SERVICE)
1177 {
1178 vws.trace(VL_INFO, "vws_tcp_svr_inetd_stop(): done");
1179 }
1180
1181 // Set state to halted
1182 server->state = VS_HALTED;
1183}
1184
1185//------------------------------------------------------------------------------
1186// Server construction / destruction
1187//------------------------------------------------------------------------------
1188
1189vws_tcp_svr* tcp_svr_ctor(vws_tcp_svr* svr, int nt, int backlog, int queue_size)
1190{
1191 if (backlog == 0)
1192 {
1193 backlog = 128;
1194 }
1195
1196 if (queue_size == 0)
1197 {
1198 queue_size = 1024;
1199 }
1200
1201 svr->threads = vws.malloc(sizeof(uv_thread_t) * nt);
1202 svr->pool_size = nt;
1203 svr->on_connect = svr_client_connect;
1204 svr->on_disconnect = svr_client_disconnect;
1205 svr->on_read = svr_client_read;
1206 svr->on_data_in = svr_client_data_in;
1207 svr->on_data_out = svr_client_data_out;
1208 svr->worker_ctor = NULL;
1209 svr->worker_ctor_data = NULL;
1210 svr->worker_dtor = NULL;
1211 svr->backlog = backlog;
1212 svr->loop = (uv_loop_t*)vws.malloc(sizeof(uv_loop_t));
1213 svr->state = VS_HALTED;
1214 svr->trace = vws.tracelevel;
1215 svr->inetd_mode = 0;
1216
1217 uv_loop_init(svr->loop);
1218 svr->cpool = address_pool_new(1000, 2);
1219 queue_init(&svr->requests, queue_size, "requests");
1220 queue_init(&svr->responses, queue_size, "responses");
1221
1222 vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo));
1223 cinfo->cnx = NULL;
1224 cinfo->server = svr;
1225 cinfo->cid = 0;
1226
1227 svr->wakeup = vws.malloc(sizeof(uv_async_t));
1228 svr->wakeup->data = cinfo;
1229 uv_async_init(svr->loop, svr->wakeup, uv_thread);
1230
1231 return svr;
1232}
1233
1234void on_uv_close(uv_handle_t* handle)
1235{
1236 if (handle != NULL)
1237 {
1238 // As a policy we don't put heap data on handle->data. If that is ever
1239 // done then it must be freed in the appropriate place. It is ignored
1240 // here because it's impossible to tell what it is by the very nature of
1241 // uv_handle_t. We will generate a warning however.
1242 if (handle->data != NULL)
1243 {
1244 vws.trace( VL_WARN,
1245 "on_uv_close(): libuv resource not properly freed: %p",
1246 (void*)handle->data );
1247 }
1248 }
1249}
1250
1251void on_uv_walk(uv_handle_t* handle, void* arg)
1252{
1253 // If this handle has not been closed, it should have been. Nevertheless we
1254 // will make an attempt to close it.
1255 if (uv_is_closing(handle) == 0)
1256 {
1257 uv_close(handle, (uv_close_cb)on_uv_close);
1258 }
1259}
1260
1261void tcp_svr_dtor(vws_tcp_svr* svr)
1262{
1263 //> Stop/shutdown server
1264
1265 if (svr->state == VS_RUNNING)
1266 {
1267 vws_tcp_svr_stop(svr);
1268 }
1269
1270 svr_shutdown(svr);
1271 vws.free(svr->threads);
1272
1273 // Close the server async handle
1274 uv_close((uv_handle_t*)svr->wakeup, svr_on_close);
1275
1276 //> Shutdown libuv
1277
1278 // Walk the loop to close everything
1279 uv_walk(svr->loop, on_uv_walk, NULL);
1280
1281 while (uv_loop_close(svr->loop))
1282 {
1283 // Run the loop until there are no more active handles
1284 while (uv_loop_alive(svr->loop))
1285 {
1286 uv_run(svr->loop, UV_RUN_DEFAULT);
1287 }
1288 }
1289
1290 // Free loop
1291 vws.free(svr->loop);
1292
1293 // Free address pool
1294 address_pool_free(&svr->cpool);
1295}
1296
1297//------------------------------------------------------------------------------
1298// Client connection callbacks
1299//------------------------------------------------------------------------------
1300
1301void svr_client_connect(vws_svr_cnx* c)
1302{
1303 if (vws.tracelevel >= VT_SERVICE)
1304 {
1305 vws.trace(VL_INFO, "svr_client_connect(%p)", c->handle);
1306 }
1307}
1308
1309void svr_client_disconnect(vws_svr_cnx* c)
1310{
1311 if (vws.tracelevel >= VT_SERVICE)
1312 {
1313 vws.trace(VL_INFO, "svr_client_disconnect(%p)", c->handle);
1314 }
1315}
1316
1317void svr_client_read(vws_svr_cnx* c, ssize_t size, const uv_buf_t* buf)
1318{
1319 vws_tcp_svr* server = c->server;
1320
1321 // Queue data to worker pool for processing
1322 vws_svr_data* data = vws_svr_data_own(server, c->cid, (ucstr)buf->base, size);
1323
1324 // Store reference to server
1325 data->server = c->server;
1326
1327 // Put on queue
1328 queue_push(&server->requests, data);
1329}
1330
1331void svr_client_data_in(vws_svr_data* m, void* x)
1332{
1333 // Default: Do nothing. Drop data.
1334 vws_svr_data_free(m);
1335}
1336
1337void svr_client_data_out(vws_svr_data* data, void* x)
1338{
1339 if (data->size == 0)
1340 {
1341 vws.trace(VL_INFO, "svr_client_data_out(): no data");
1342 vws.error(VL_WARN, "svr_client_data_out(): no data");
1343 return;
1344 }
1345
1346 uv_buf_t buf = uv_buf_init(data->data, data->size);
1347 uv_write_t* req = (uv_write_t*)vws.malloc(sizeof(uv_write_t));
1348 req->data = data;
1349
1350 // Check address pool and ensure connection is still active
1351 uintptr_t ptr = address_pool_get(data->server->cpool, data->cid);
1352
1353 if (ptr == 0)
1354 {
1355 // Connection no longer exists.
1356 vws_svr_data_free(data);
1357 vws.free(req);
1358 return;
1359 }
1360
1361 // Write out to libuv
1362 uv_stream_t* handle = ((vws_svr_cnx*)ptr)->handle;
1363 if (uv_write(req, handle, &buf, 1, svr_on_write_complete) != 0)
1364 {
1365 // Connection is closing/closed.
1366 vws_svr_data_free(data);
1367 vws.free(req);
1368 }
1369}
1370
1371//------------------------------------------------------------------------------
1372// Server Connection
1373//------------------------------------------------------------------------------
1374
1375vws_svr_cnx* svr_cnx_new(vws_tcp_svr* s, uv_stream_t* handle)
1376{
1377 vws_svr_cnx* cnx = vws.malloc(sizeof(vws_svr_cnx));
1378 cnx->server = s;
1379 cnx->handle = handle;
1380 cnx->data = NULL;
1381 cnx->format = VM_MPACK_FORMAT;
1382
1383 // Initialize HTTP state
1384 cnx->upgraded = false;
1385 cnx->http = vws_http_msg_new(HTTP_REQUEST);
1386
1387 // Add to address pool
1388 cnx->cid = address_pool_set(s->cpool, (uintptr_t)cnx);
1389
1390 return cnx;
1391}
1392
1393void svr_cnx_free(vws_svr_cnx* c)
1394{
1395 if (c != NULL)
1396 {
1397 if (c->http != NULL)
1398 {
1399 vws_http_msg_free(c->http);
1400 }
1401
1402 // Remove from pool
1403 address_pool_remove(c->server->cpool, c->cid);
1404
1405 vws.free(c);
1406 }
1407}
1408
1409void svr_cnx_close(vws_tcp_svr* server, vws_cid_t cid)
1410{
1411 vws_tcp_svr_close(server, cid);
1412}
1413
1414//------------------------------------------------------------------------------
1415// Server Utilities
1416//------------------------------------------------------------------------------
1417
1418vws_svr_cnx* svr_cnx_map_get(vws_svr_cnx_map* map, uv_stream_t* key)
1419{
1420 vws_svr_cnx* cnx = sc_map_get_64v(map, (uint64_t)key);
1421
1422 // If entry exists
1423 if (sc_map_found(map) == true)
1424 {
1425 return cnx;
1426 }
1427
1428 return NULL;
1429}
1430
1431void svr_cnx_map_remove(vws_svr_cnx_map* map, uv_stream_t* key)
1432{
1433 vws_svr_cnx* cnx = sc_map_get_64v(map, (uint64_t)key);
1434
1435 // If entry exists
1436 if (sc_map_found(map) == true)
1437 {
1438 sc_map_del_64v(map, (uint64_t)key);
1439
1440 // Call on_disconnect() handler
1441 cnx->server->on_disconnect(cnx);
1442
1443 // Cleanup
1444 vws.free(cnx);
1445 }
1446}
1447
1448int8_t svr_cnx_map_set(vws_svr_cnx_map* map, uv_stream_t* key, vws_svr_cnx* value)
1449{
1450 // See if we have an existing entry
1451 sc_map_get_64v(map, (uint64_t)key);
1452
1453 if (sc_map_found(map) == true)
1454 {
1455 // Exsiting entry. Return false.
1456 return 0;
1457 }
1458
1459 sc_map_put_64v(map, (uint64_t)key, value);
1460
1461 // True
1462 return 1;
1463}
1464
1465void svr_cnx_map_clear(vws_svr_cnx_map* map)
1466{
1467 uint64_t key; vws_svr_cnx* cnx;
1468 sc_map_foreach(map, key, cnx)
1469 {
1470 cnx->server->on_disconnect(cnx);
1471 vws.free(cnx);
1472 }
1473
1474 sc_map_clear_64v(map);
1475}
1476
1477void svr_shutdown(vws_tcp_svr* server)
1478{
1479 if (server->state == VS_HALTED)
1480 {
1481 return;
1482 }
1483
1484 if (vws.tracelevel >= VT_SERVICE)
1485 {
1486 vws.trace(VL_INFO, "svr_shutdown(%p): Shutdown starting", server);
1487 }
1488
1489 // Cleanup queues
1490
1491 vws_svr_queue* queue = &server->responses;
1492 uv_mutex_lock(&queue->mutex);
1493 while (queue->size > 0)
1494 {
1495 vws_svr_data* data = queue->buffer[queue->head];
1496 queue->head = (queue->head + 1) % queue->capacity;
1497 queue->size--;
1498
1499 vws_svr_data_free(data);
1500 }
1501 uv_mutex_unlock(&queue->mutex);
1502
1503 queue_destroy(&server->requests);
1504 queue_destroy(&server->responses);
1505
1506 // Stop the loop. This will cause uv_run() to return in vws_tcp_svr_run()
1507 // which will also return.
1508 uv_stop(server->loop);
1509}
1510
1511void svr_on_connect(uv_stream_t* socket, int status)
1512{
1513 vws_cinfo* svr_addr = (vws_cinfo*)socket->data;
1514 vws_tcp_svr* server = svr_addr->server;
1515 //vws_tcp_svr* server = (vws_tcp_svr*) socket->data;
1516
1517 if (status < 0)
1518 {
1519 cstr e = uv_strerror(status);
1520 vws.error(VE_RT, "Error in connection callback: %s", e);
1521 return;
1522 }
1523
1524 uv_tcp_t* c = (uv_tcp_t*)vws.malloc(sizeof(uv_tcp_t));
1525
1526 if (c == NULL)
1527 {
1528 vws.error(VE_RT, "Failed to allocate memory for client");
1529 return;
1530 }
1531
1532 vws_cinfo* cinfo = vws.malloc(sizeof(vws_cinfo));
1533 cinfo->cnx = NULL;
1534 cinfo->server = server;
1535 c->data = cinfo;
1536
1537 if (uv_tcp_init(server->loop, c) != 0)
1538 {
1539 vws.error(VE_RT, "Failed to initialize client");
1540 return;
1541 }
1542
1543 if (uv_accept(socket, (uv_stream_t*)c) == 0)
1544 {
1545 if (uv_read_start((uv_stream_t*)c, svr_on_realloc, svr_on_read) != 0)
1546 {
1547 vws.error(VE_RT, "Failed to start reading from client");
1548 return;
1549 }
1550 }
1551 else
1552 {
1553 uv_close((uv_handle_t*)c, svr_on_close);
1554 }
1555
1556 //> Add connection to registry and initialize
1557
1558 vws_svr_cnx* cnx = svr_cnx_new(server, (uv_stream_t*)c);
1559 cinfo->cnx = cnx;
1560 cinfo->cid = cnx->cid;
1561
1562 //> Call svr_on_connect() handler
1563
1564 server->on_connect(cnx);
1565}
1566
1567void svr_on_read(uv_stream_t* c, ssize_t nread, const uv_buf_t* buf)
1568{
1569 vws_cinfo* cinfo = (vws_cinfo*)c->data;
1570 vws_svr_cnx* cnx = cinfo->cnx;;
1571 vws_tcp_svr* server = cinfo->server;
1572 vws_cid_t cid = cinfo->cid;
1573
1574 if (nread < 0)
1575 {
1576 uv_close((uv_handle_t*)c, svr_on_close);
1577 vws.free(buf->base);
1578 return;
1579 }
1580
1581 // Lookup connection
1582 uintptr_t ptr = address_pool_get(server->cpool, cid);
1583
1584 if (ptr != 0)
1585 {
1586 vws_svr_cnx* cnx = (vws_svr_cnx*)ptr;
1587 server->on_read(cnx, nread, buf);
1588 }
1589}
1590
1591void svr_on_write_complete(uv_write_t* req, int status)
1592{
1593 vws_svr_data_free((vws_svr_data*)req->data);
1594 vws.free(req);
1595}
1596
1597void svr_on_close(uv_handle_t* handle)
1598{
1599 vws_cinfo* cinfo = (vws_cinfo*)handle->data;
1600 vws_svr_cnx* cnx = cinfo->cnx;
1601 vws_tcp_svr* server = cinfo->server;
1602 vws_cid_t cid = cinfo->cid;
1603
1604 // Lookup connection
1605 uintptr_t ptr = address_pool_get(server->cpool, cid);
1606
1607 if (ptr != 0)
1608 {
1609 vws_svr_cnx* cnx = (vws_svr_cnx*)ptr;
1610
1611 // Call on_disconnect() handler
1612 server->on_disconnect(cnx);
1613
1614 // Remove from map
1615 //sc_map_del_64v(map, (uint64_t)handle);
1616
1617 // Cleanup
1618 svr_cnx_free(cnx);
1619 }
1620
1621 vws.free(handle->data);
1622 vws.free(handle);
1623
1624 // If we are running in inetd mode, there is only one socket and its closing
1625 // means we are done and should exit process.
1626 if ((server->inetd_mode == 1) && (server->state == VS_RUNNING))
1627 {
1628 vws_tcp_svr_inetd_stop(server);
1629 }
1630}
1631
1632void svr_on_realloc(uv_handle_t* handle, size_t size, uv_buf_t* buf)
1633{
1634 buf->base = (char*)vws.realloc(buf->base, size);
1635 buf->len = size;
1636}
1637
1638//------------------------------------------------------------------------------
1639// Queue API
1640//------------------------------------------------------------------------------
1641
1642void queue_init(vws_svr_queue* queue, int size, cstr name)
1643{
1644 queue->buffer = (vws_svr_data**)vws.malloc(size * sizeof(vws_svr_data*));
1645 queue->size = 0;
1646 queue->capacity = size;
1647 queue->head = 0;
1648 queue->tail = 0;
1649 queue->state = VS_RUNNING;
1650 queue->name = strdup(name);
1651
1652 // Initialize mutex and condition variable
1653 uv_mutex_init(&queue->mutex);
1654 uv_cond_init(&queue->cond);
1655}
1656
1657void queue_destroy(vws_svr_queue* queue)
1658{
1659 if (queue->buffer != NULL)
1660 {
1661 vws.free(queue->name);
1662 uv_mutex_destroy(&queue->mutex);
1663 uv_cond_destroy(&queue->cond);
1664 vws.free(queue->buffer);
1665 queue->buffer = NULL;
1666 queue->state = VS_HALTED;
1667 }
1668}
1669
1670void queue_push(vws_svr_queue* queue, vws_svr_data* data)
1671{
1672 if (queue->state != VS_RUNNING)
1673 {
1674 vws.free(data);
1675 return;
1676 }
1677
1678 uv_mutex_lock(&queue->mutex);
1679
1680 while (queue->size == queue->capacity)
1681 {
1682 uv_cond_wait(&queue->cond, &queue->mutex);
1683 }
1684
1685 if (queue->state != VS_RUNNING)
1686 {
1687 uv_cond_signal(&queue->cond);
1688 uv_mutex_unlock(&queue->mutex);
1689 return;
1690 }
1691
1692 queue->buffer[queue->tail] = data;
1693 queue->tail = (queue->tail + 1) % queue->capacity;
1694 queue->size++;
1695
1696 // Signal condition variable
1697 uv_cond_signal(&queue->cond);
1698 uv_mutex_unlock(&queue->mutex);
1699}
1700
1701vws_svr_data* queue_pop(vws_svr_queue* queue)
1702{
1703 uv_mutex_lock(&queue->mutex);
1704
1705 while (queue->size == 0 && queue->state == VS_RUNNING)
1706 {
1707 uv_cond_wait(&queue->cond, &queue->mutex);
1708 }
1709
1710 if (queue->state == VS_HALTING)
1711 {
1712 uv_cond_broadcast(&queue->cond);
1713 uv_mutex_unlock(&queue->mutex);
1714
1715 return NULL;
1716 }
1717
1718 vws_svr_data* data = queue->buffer[queue->head];
1719 queue->head = (queue->head + 1) % queue->capacity;
1720 queue->size--;
1721
1722 uv_mutex_unlock(&queue->mutex);
1723
1724 return data;
1725}
1726
1727bool queue_empty(vws_svr_queue* queue)
1728{
1729 uv_mutex_lock(&queue->mutex);
1730 bool empty = (queue->size == 0);
1731 uv_mutex_unlock(&queue->mutex);
1732
1733 return empty;
1734}
1735
1736//------------------------------------------------------------------------------
1737// Pure WebSocket Server
1738//------------------------------------------------------------------------------
1739
1740void ws_svr_process_frame(vws_cnx* c, vws_frame* f)
1741{
1742 vws_svr_cnx* cnx = (vws_svr_cnx*)c->data;
1743
1744 switch (f->opcode)
1745 {
1746 case CLOSE_FRAME:
1747 {
1748 // Build the response frame
1749 vws_buffer* buffer = vws_generate_close_frame();
1750
1751 // Send back to cliane Send the PONG response
1752 vws_svr_data* response;
1753
1754 response = vws_svr_data_new(cnx->server, cnx->cid, &buffer);
1755 vws_tcp_svr_send(response);
1756
1757 // Free buffer
1758 vws_buffer_free(buffer);
1759
1760 // Free frame
1761 vws_frame_free(f);
1762
1763 break;
1764 }
1765
1766 case TEXT_FRAME:
1767 case BINARY_FRAME:
1768 case CONTINUATION_FRAME:
1769 {
1770 // Add to queue
1771 sc_queue_add_first(&c->queue, f);
1772
1773 break;
1774 }
1775
1776 case PING_FRAME:
1777 {
1778 // Generate the PONG response
1779 vws_buffer* buffer = vws_generate_pong_frame(f->data, f->size);
1780
1781 // Send back to cliane Send the PONG response
1782 vws_svr_data* response;
1783 response = vws_svr_data_new(cnx->server, cnx->cid, &buffer);
1784 vws_tcp_svr_send(response);
1785
1786 // Free buffer
1787 vws_buffer_free(buffer);
1788
1789 // Free frame
1790 vws_frame_free(f);
1791
1792 break;
1793 }
1794
1795 case PONG_FRAME:
1796 {
1797 // No need to send a response
1798
1799 vws_frame_free(f);
1800
1801 break;
1802 }
1803
1804 default:
1805 {
1806 // Invalid frame type
1807 vws_frame_free(f);
1808 }
1809 }
1810
1811 vws.success();
1812}
1813
1814void ws_svr_client_connect(vws_svr_cnx* c)
1815{
1816 if (vws.tracelevel >= VT_SERVICE)
1817 {
1818 vws.trace(VL_INFO, "ws_svr_client_connect(%p)", c->handle);
1819 }
1820
1821 // Create a new vws_cnx
1822 vws_cnx* cnx = (void*)vws_cnx_new();
1823 cnx->process = ws_svr_process_frame;
1824 cnx->data = (void*)c; // Link cnx -> c
1825 c->data = (void*)cnx; // Link c -> cnx
1826}
1827
1828void ws_svr_client_disconnect(vws_svr_cnx* c)
1829{
1830 if (vws.tracelevel >= VT_SERVICE)
1831 {
1832 vws.trace(VL_INFO, "ws_svr_client_disconnect(%p)", c->handle);
1833 }
1834
1835 if (c->data != NULL)
1836 {
1837 vws_cnx_free((vws_cnx*)c->data);
1838 c->data = NULL;
1839 }
1840}
1841
1842void vws_tcp_svr_close(vws_tcp_svr* server, vws_cid_t cid)
1843{
1844 // Create reply
1845 vws_svr_data* reply;
1846 reply = vws_svr_data_own(server, cid, NULL, 0);
1847
1848 // Set connection close flag
1849 vws_set_flag(&reply->flags, VM_SVR_DATA_CLOSE);
1850
1851 // Queue the data to uv_thread() to send out on wire
1852 vws_tcp_svr_send(reply);
1853}
1854
1855// Runs in uv_thread()
1856void ws_svr_client_read(vws_svr_cnx* cnx, ssize_t size, const uv_buf_t* buf)
1857{
1858 vws_tcp_svr* server = cnx->server;
1859 vws_cnx* c = (vws_cnx*)cnx->data;
1860
1861 // Add to client socket buffer
1862 vws_buffer_append(c->base.buffer, (ucstr)buf->base, size);
1863
1864 // Free libuv memory
1865 vws.free(buf->base);
1866
1867 // If we are in HTTP mode
1868 if (cnx->upgraded == false)
1869 {
1870 // Parse incoming data as HTTP request.
1871
1872 ucstr data = c->base.buffer->data;
1873 size_t size = c->base.buffer->size;
1874 ssize_t n = vws_http_msg_parse(cnx->http, (cstr)data, size);
1875
1876 // Did we get a complete request?
1877 if (cnx->http->headers_complete == true)
1878 {
1879 // Check for parsing errors
1880 enum llhttp_errno err = llhttp_get_errno(cnx->http->parser);
1881
1882 // If there was a parsing error, close connection
1883 if(err != HPE_PAUSED)
1884 {
1885 vws.error( VE_RT, "Error: %s (%s)",
1886 llhttp_errno_name(err),
1887 llhttp_get_error_reason(cnx->http->parser) );
1888
1889 // Close connection
1890 svr_cnx_close(server, cnx->cid);
1891
1892 return;
1893 }
1894
1895 // Drain HTTP request data from cnx->data buffer
1896 vws_buffer_drain(c->base.buffer, n);
1897
1898 //> Generate HTTP response and send
1899
1900 vws_buffer* http = vws_buffer_new();
1901
1902 struct sc_map_str* headers = &cnx->http->headers;
1903 cstr key = vws_map_get(headers, "sec-websocket-key");
1904 cstr proto = vws_map_get(headers, "sec-websocket-protocol");
1905
1906 vws_buffer_printf(http, "HTTP/1.1 101 Switching Protocols\r\n");
1907 vws_buffer_printf(http, "Upgrade: websocket\r\n");
1908 vws_buffer_printf(http, "Connection: Upgrade\r\n");
1909
1910 cstr ac = vws_accept_key(key);
1911 vws_buffer_printf(http, "Sec-WebSocket-Accept: %s\r\n", ac);
1912 vws.free(ac);
1913
1914 vws_buffer_printf(http, "Sec-WebSocket-Version: 13\r\n");
1915 vws_buffer_printf(http, "Sec-WebSocket-Protocol: ");
1916
1917 if (proto != NULL)
1918 {
1919 vws_buffer_printf(http, "%s\r\n", proto);
1920 }
1921 else
1922 {
1923 vws_buffer_printf(http, "vrtql\r\n");
1924 }
1925
1926 vws_buffer_printf(http, "\r\n");
1927
1928 // Package up response
1929 vws_svr_data* reply;
1930 reply = vws_svr_data_new(cnx->server, cnx->cid, &http);
1931
1932 // Send directly out as we are in uv_thread()
1933 server->on_data_out(reply, NULL);
1934
1935 // Cleanup. Buffer data was passed to reply.
1936 vws_buffer_free(http);
1937
1938 //> Change state to WebSocket mode
1939
1940 // Set the flag that we are in WebSocket mode
1941 cnx->upgraded = true;
1942
1943 // Free HTTP request as we don't need it anymore
1944 vws_http_msg_free(cnx->http);
1945 cnx->http = NULL;
1946
1947 // Do we have any data in the socket after consuming the HTTP
1948 // request? We shouldn't but if so this is WebSocket data.
1949 if (c->base.buffer->size == 0)
1950 {
1951 // No more data in the socket buffer. Done for now.
1952 return;
1953 }
1954 }
1955 else
1956 {
1957 // No complete HTTP request yet
1958 return;
1959 }
1960
1961 // If we get here, we have a complete request and we have incoming
1962 // WebSocket data to process (c->base.buffer->size > 0)
1963 }
1964
1965 if (vws_cnx_ingress(c) > 0)
1966 {
1967 // Process as many messages as possible
1968 while (true)
1969 {
1970 // Check for a complete message
1971 vws_msg* wsm = vws_msg_pop(c);
1972
1973 if (wsm == NULL)
1974 {
1975 return;
1976 }
1977
1978 // Pass message pointer in block
1979 vws_svr_data* block;
1980 block = vws_svr_data_own( cnx->server,
1981 cnx->cid,
1982 (ucstr)wsm,
1983 sizeof(vws_msg*) );
1984 queue_push(&server->requests, block);
1985 }
1986 }
1987}
1988
1989// Runs in worker_thread()
1990void ws_svr_client_data_in(vws_svr_data* block, void* x)
1991{
1992 //> Append data to connection buffer
1993
1994 vws_cid_t cid = block->cid;
1995 vws_svr* server = (vws_svr*)block->server;
1996
1997 // Data simply contains a pointer to a websocket message
1998 vws_msg* wsm = (vws_msg*)block->data;
1999
2000 // Free incoming data
2001 block->data = NULL;
2002 block->size = 0;
2003 vws_svr_data_free(block);
2004
2005 //> Process connection buffer data for complete messages
2006 server->on_msg_in(server, cid, wsm, x);
2007}
2008
2009void ws_svr_client_data_out( vws_svr* server,
2010 vws_cid_t cid,
2011 vws_buffer* buffer,
2012 unsigned char opcode )
2013{
2014 // Create a binary websocket frame containing message
2015 vws_frame* frame;
2016 ucstr data = buffer->data;
2017 size_t size = buffer->size;
2018 frame = vws_frame_new(data, size, opcode);
2019
2020 // This frame is from server to we don't mask it
2021 frame->mask = 0;
2022
2023 // Serialize frame: frame is freed by function
2024 vws_buffer* fdata = vws_serialize(frame);
2025
2026 // Pack frame binary into queue data
2027 vws_svr_data* response;
2028
2029 response = vws_svr_data_new((vws_tcp_svr*)server, cid, &fdata);
2030
2031 // Queue the data to uv_thread() to send out on wire
2032 vws_tcp_svr_send(response);
2033
2034 // Free buffers
2035 vws_buffer_free(fdata);
2036}
2037
2038void ws_svr_client_msg_in(vws_svr* s, vws_cid_t c, vws_msg* m, void* x)
2039{
2040 vws_svr* server = (vws_svr*)s;
2041
2042 // Route to application-specific processing callback
2043 server->process(server, c, m, x);
2044}
2045
2046void ws_svr_client_process(vws_svr* s, vws_cid_t c, vws_msg* m, void* x)
2047{
2048 // Default: Do nothing. Drop message.
2049 vws_msg_free(m);
2050}
2051
2052void ws_svr_client_msg_out(vws_svr* s, vws_cid_t c, vws_msg* m, void* x)
2053{
2054 ws_svr_client_data_out(s, c, m->data, m->opcode);
2055 vws_msg_free(m);
2056}
2057
2058void ws_svr_ctor(vws_svr* server, int nt, int bl, int qs)
2059{
2060 tcp_svr_ctor((vws_tcp_svr*)server, nt, bl, qs);
2061
2062 // Server base function overrides
2063 server->base.on_connect = ws_svr_client_connect;
2064 server->base.on_disconnect = ws_svr_client_disconnect;
2065 server->base.on_read = ws_svr_client_read;
2066 server->base.on_data_in = ws_svr_client_data_in;
2067
2068 // Message handling
2069 server->on_msg_in = ws_svr_client_msg_in;
2070 server->on_msg_out = ws_svr_client_msg_out;
2071
2072 // Application functions
2073 server->process = ws_svr_client_process;
2074 server->send = ws_svr_client_msg_out;
2075}
2076
2077vws_svr* vws_svr_new(int num_threads, int backlog, int queue_size)
2078{
2079 vws_svr* server = vws.malloc(sizeof(vws_svr));
2080 ws_svr_ctor(server, num_threads, backlog, queue_size);
2081 return server;
2082}
2083
2084void ws_svr_dtor(vws_svr* server)
2085{
2086 if (server == NULL)
2087 {
2088 return;
2089 }
2090
2091 tcp_svr_dtor((vws_tcp_svr*)server);
2092}
2093
2094void vws_svr_free(vws_svr* server)
2095{
2096 if (server == NULL)
2097 {
2098 return;
2099 }
2100
2101 ws_svr_dtor(server);
2102 vws.free(server);
2103}
2104
2105//------------------------------------------------------------------------------
2106// Messaging Server: Derived from WebSocket server
2107//------------------------------------------------------------------------------
2108
2109// Convert incoming WebSocket messages to VRTQL messages for processing
2110void msg_svr_client_ws_msg_in(vws_svr* s, vws_cid_t cid, vws_msg* wsm, void* x)
2111{
2112 // Deserialize message
2113
2114 vrtql_msg* msg = vrtql_msg_new(HTTP_REQUEST);
2115 ucstr data = wsm->data->data;
2116 size_t size = wsm->data->size;
2117
2118 if (vrtql_msg_deserialize(msg, data, size) == false)
2119 {
2120 // Deserialized failed
2121
2122 // Error already set
2123 vws_msg_free(wsm);
2124 vrtql_msg_free(msg);
2125
2126 vws_tcp_svr_close((vws_tcp_svr*)s, cid);
2127
2128 return;
2129 }
2130
2131 // Deserialized succeeded
2132
2133 // TODO
2134 // We send back the format we get. More than that, a request with different
2135 // format effectively changes the entire connection default format setting.
2136 // cnx->format = msg->format;
2137
2138 // Free websocket message
2139 vws_msg_free(wsm);
2140
2141 // Process message
2142 vrtql_msg_svr* server = (vrtql_msg_svr*)s;
2143 server->on_msg_in(s, cid, msg, x);
2144}
2145
2146// Receive/handle incoming VRTQL messages
2147void msg_svr_client_msg_in(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x)
2148{
2149 vrtql_msg_svr* server = (vrtql_msg_svr*)s;
2150
2151 // Route to application-specific processing callback
2152 server->process(s, c, m, x);
2153}
2154
2155// Send VRTQL messages
2156void msg_svr_client_msg_out(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x)
2157{
2158 // Serialize message
2159 vws_buffer* mdata = vrtql_msg_serialize(m);
2160
2161 // Send to base class
2162 ws_svr_client_data_out(s, c, mdata, BINARY_FRAME);
2163
2164 // Cleanup
2165 vws_buffer_free(mdata);
2166 vrtql_msg_free(m);
2167}
2168
2169// Process incoming VRTQL messages
2170void msg_svr_client_process(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x)
2171{
2172 // Default: Do nothing. Drop message.
2173 vrtql_msg_free(m);
2174}
2175
2176vrtql_msg_svr* vrtql_msg_svr_new(int num_threads, int backlog, int queue_size)
2177{
2178 vrtql_msg_svr* server = vws.malloc(sizeof(vrtql_msg_svr));
2179 return vrtql_msg_svr_ctor(server, num_threads, backlog, queue_size);
2180}
2181
2182void vrtql_msg_svr_free(vrtql_msg_svr* server)
2183{
2184 vrtql_msg_svr_dtor(server);
2185}
2186
2187vrtql_msg_svr*
2188vrtql_msg_svr_ctor(vrtql_msg_svr* server, int threads, int backlog, int qsize)
2189{
2190 ws_svr_ctor((vws_svr*)server, threads, backlog, qsize);
2191
2192 // Server base function overrides
2193 server->base.process = msg_svr_client_ws_msg_in;
2194
2195 // Message handling
2196 server->on_msg_in = msg_svr_client_msg_in;
2197 server->on_msg_out = msg_svr_client_msg_out;
2198
2199 // Application functions
2200 server->process = msg_svr_client_process;
2201 server->send = msg_svr_client_msg_out;
2202
2203 // User-defined data
2204 server->data = NULL;
2205
2206 return server;
2207}
2208
2209void vrtql_msg_svr_dtor(vrtql_msg_svr* server)
2210{
2211 if (server == NULL)
2212 {
2213 return;
2214 }
2215
2216 ws_svr_dtor((vws_svr*)server);
2217 vws.free(server);
2218}
2219