1#ifndef VRTQL_SVR_DECLARE
2#define VRTQL_SVR_DECLARE
3
4#include <uv.h>
5
6#include "vws.h"
7#include "message.h"
8#include "http_message.h"
9
10/**
11 * @file server.h
12 * @brief WebSocket server implementation
13 *
14 * This file implements a non-blocking, multiplexing, multithreaded WebSocket
15 * server using the libuv library. The server consists of a main networking
16 * thread that runs the libuv loop to handle socket I/O, and a pool of worker
17 * threads that process the data.
18 *
19 * The networking thread evenly distributes incoming data from clients to the
20 * worker threads using a synchronized queue. The worker threads process the
21 * data and may send back replies. The server maintains two queues: a request
22 * queue that transfers incoming client data to the worker threads for
23 * processing, and a response queue that transfers data from the worker threads
24 * back to the network thread for sending it back to the client.
25 *
26 * The data items are stored in a generic structure called vws_svr_data, which
27 * holds the data and the associated connection. The worker threads retrieve
28 * data from the request queue and perform the following steps:
29 *
30 * 1. Assemble data into WebSocket frames.
31 * 2. Assemble frames into WebSocket messages.
32 * 3. Assemble WebSocket messages into VRTQL messages.
33 * 4. Pass the messages to a user-defined processing function.
34 *
35 * The processing function can optionally produce data to send back to the
36 * client.
37 *
38 * From a programming standpoint, the server architecture simplifies to
39 * implementing two functions that run in the context of a worker thread:
40 *
41 * - process(request message): Process incoming messages.
42 * - send(reply message): Send back reply messages.
43 *
44 * The server takes care of all the serialization, handling the communication
45 * between the network thread and worker threads. Developers only need to focus
46 * on the processing logic.
47 *
48 * The number of worker threads is configurable, allowing you to adjust it based
49 * on workload or system requirements.
50 */
51
52#ifdef __cplusplus
53extern "C" {
54#endif
55
56/**
57 * @struct address_pool
58 * @brief Manages a dynamically resizable pool of addresses.
59 *
60 * The address_pool structure is designed to handle a collection of memory
61 * addresses. It is a dynamically resizable array that works like ring
62 * buffer. It supports efficient addition, lookup and removal of items while
63 * automatically handling memory resizing based on usage. It's main use is to
64 * track and identify connections. It is significantly faster a hashtable which
65 * provided much better overall server performance.
66 *
67 * Connections change over time and for the most part keep moving forward in the
68 * ring buffer in a sort of statistical distribtion. By the time it reachs the
69 * end of the ring to wrap around, all previous connections at the beginning
70 * have most likely disconnected.
71 *
72 * @details
73 *
74 * - The pool grows in capacity by a specified growth factor each time the array
75 * reaches its current capacity limit. This growth behavior helps in managing
76 * memory more efficiently by minimizing the number of memory reallocations
77 * required as the number of items grows.
78 *
79 * - The implementation uses a ring buffer approach with a last used index to
80 * optimize the search for free slots, allowing for O(1) time complexity for
81 * addition in most cases, barring the necessity for resizing.
82 */
83
84typedef struct
85{
86 /**< Pointer to the array holding memory addresses or similar values */
87 uintptr_t* slots;
88
89 /**< Total number of slots in the array */
90 uint32_t capacity;
91
92 /**< Number of used slots */
93 uint32_t count;
94
95 /**< Last used index to optimize search for empty slots */
96 uint32_t last_used_index;
97
98 /**< Factor by which the array size is increased upon realloc */
99 uint16_t growth_factor;
100} address_pool;
101
102/**
103 * @brief Creates a new address pool.
104 *
105 * This function allocates and initializes an address pool with specified
106 * initial size and growth factor. Memory for the pool and its slots is
107 * allocated dynamically.
108 *
109 * @param initial_size The initial number of slots in the pool.
110 * @param growth_factor The factor by which the pool size is increased when
111 * resized.
112 * @return address_pool* A pointer to the newly created address pool or NULL if
113 * allocation fails.
114 */
115address_pool* address_pool_new(int initial_size, int growth_factor);
116
117/**
118 * @brief Frees the memory associated with an address pool.
119 *
120 * This function deallocates the memory for the address pool and sets the
121 * pointer to NULL to prevent dangling references. It safely handles NULL
122 * pointers.
123 *
124 * @param pool A pointer to the pointer of the address pool to be freed.
125 */
126void address_pool_free(address_pool** pool);
127
128/**
129 * @brief Resizes an address pool to accommodate more items.
130 *
131 * The function increases the capacity of the address pool based on its growth
132 * factor. Existing items are preserved, and additional space is initialized to
133 * zero. If memory allocation fails, the pool's slots pointer remains unchanged.
134 *
135 * @param pool A pointer to the address pool to be resized.
136 */
137void address_pool_resize(address_pool* pool);
138
139/**
140 * @brief Adds a new item to the address pool.
141 *
142 * This function adds a new item to the address pool. If the pool is full, it is
143 * resized. The function searches for the first free slot to use, starting from
144 * the last used index and wrapping around if necessary.
145 *
146 * @param pool A pointer to the address pool.
147 * @param address The uintptr_t item to be added to the pool.
148 * @return int The index at which the item was added, or -1 if resizing failed.
149 */
150uint32_t address_pool_set(address_pool* pool, uintptr_t address);
151
152/**
153 * @brief Retrieves the item stored at the specified index in the address pool.
154 *
155 * This function returns the value at a given index in the address pool's slots
156 * array. If the index is out of bounds or the slot at the index is empty
157 * (zero), the function returns 0. This method is intended for quick access to
158 * items in the pool without any modifications.
159 *
160 * @param pool A pointer to the address pool.
161 * @param index The index of the item to retrieve.
162 * @return uintptr_t The value at the specified index if it exists and is not
163 * empty; otherwise, 0.
164 */
165uintptr_t address_pool_get(address_pool* pool, uint32_t index);
166
167/**
168 * @brief Removes an item from the address pool.
169 *
170 * This function sets the slot at the specified index to zero, marking it as
171 * free. The count of used slots is decremented.
172 *
173 * @param pool A pointer to the address pool.
174 * @param index The index of the slot to be freed.
175 */
176void address_pool_remove(address_pool* pool, uint32_t index);
177
178struct vws_svr_cnx;
179struct vws_svr;
180
181typedef enum
182{
183 /* uv_thread() is to close connection */
184 VM_SVR_DATA_CLOSE = (1 << 1)
185
186} vws_svr_data_state_t;
187
188/** Connection ID. This is the index within the address pool that the
189 * connection's pointer is stored. */
190typedef uint32_t vws_cid_t;
191
192/** This is used to associate connection info with uv_stream_t handles */
193typedef struct vws_cinfo
194{
195 struct vws_tcp_svr* server;
196 struct vws_svr_cnx* cnx;
197 vws_cid_t cid;
198} vws_cinfo;
199
200struct vws_tcp_svr;
201
202/**
203 * @brief Struct representing server data for inter-thread communication
204 * between the main network thread and worker threads. This is the way data is
205 * passed between them. When passed from the network thread to the worker
206 * thread, these take the form of incoming data from the client. When passed
207 * from the worker thread to the network thread, they represent outgoing data to
208 * the client.
209 */
210typedef struct
211{
212 /**< The client connection index associated with the data */
213 vws_cid_t cid;
214
215 /**< The number of bytes of data */
216 size_t size;
217
218 /**< The data */
219 char* data;
220
221 /**< Message state flags */
222 uint64_t flags;
223
224 /**< Reference to server this data belongs to */
225 struct vws_tcp_svr* server;
226
227} vws_svr_data;
228
229/**
230 * @brief Struct representing a server queue, including information about
231 * buffer, size, capacity, and threading.
232 */
233typedef struct
234{
235 /**< The buffer holding data in the queue */
236 vws_svr_data** buffer;
237
238 /**< Current size of the queue */
239 int size;
240
241 /**< Maximum capacity of the queue */
242 int capacity;
243
244 /**< Head position of the queue */
245 int head;
246
247 /**< Tail position of the queue */
248 int tail;
249
250 /**< Mutex for thread safety */
251 uv_mutex_t mutex;
252
253 /**< Condition variable for thread synchronization */
254 uv_cond_t cond;
255
256 /**< Current state of the queue */
257 uint8_t state;
258
259 /**< Queue name */
260 cstr name;
261
262} vws_svr_queue;
263
264struct vws_tcp_svr;
265
266/**
267 * @brief Represents a client connection.
268 */
269typedef struct vws_svr_cnx
270{
271 /**< The server associated with the connection */
272 struct vws_tcp_svr* server;
273
274 /**< The client associated with the connection */
275 uv_stream_t* handle;
276
277 /* Flag holds the HTTP request that started connection. */
278 vws_http_msg* http;
279
280 /** Flag that holds whether we have upgraded connection from HTTP to
281 * WebSockets */
282 bool upgraded;
283
284 /** Index in connection address pool */
285 vws_cid_t cid;
286
287 /**< User-defined data associated with the connection */
288 char* data;
289
290 /**< The format to serialize. If VM_MPACK_FORMAT, serialize into MessagePack
291 * binary format. If VM_JSON_FORMAT, then serialize into JSON format.
292 */
293 vrtql_msg_format_t format;
294
295} vws_svr_cnx;
296
297
298/** Thread context constructor (factory) function */
299typedef void* (*vws_thread_ctx_ctor)(void* data);
300
301/** Thread context constructor (factory) function */
302typedef void (*vws_thread_ctx_dtor)(void* data);
303
304/**
305 * @brief Represents a worker thread context
306 *
307 * Thread Context. Worker threads can carry a user-defined context which they
308 * pass to process_message() handlers. They call a factory method to create this
309 * context. Upon exit, they likewise call a factory method to destruct it.
310 *
311 * This context acts as a lifelong state within the worker_thread which provides
312 * tasks with additional resources to do their job. This context is ideal for
313 * maintaining state across tasks like database connections.
314 */
315typedef struct vws_thread_ctx
316{
317 /**< Context constructor: A pointer to a function which creates the thread
318 * context. The context is returned as a void* pointer and is stored in the
319 * data member. */
320 vws_thread_ctx_ctor ctor;
321
322 /**< Context constructor data: A pointer to optional user-data passed into
323 * the ctor to help in context construction. */
324 void* ctor_data;
325
326 /**< Context destructor: A pointer to a function which deallocates the
327 * thread context. This is where for worker thread shutdown used to clean up
328 * resources. */
329 vws_thread_ctx_dtor dtor;
330
331 /**< Thread context: This points to the context allocated by ctor. It will
332 * be passed passed to from worker thread to process_message(void* ctx). */
333 void* data;
334} vws_thread_ctx;
335
336/**
337 * @brief Callback for a new connection connection
338 * @param c The connection structure
339 */
340typedef void (*vws_tcp_svr_connect)(vws_svr_cnx* c);
341
342/**
343 * @brief Callback for connection disconnection
344 * @param c The connection structure
345 */
346typedef void (*vws_tcp_svr_disconnect)(vws_svr_cnx* c);
347
348/**
349 * @brief Callback for connection read
350 * @param c The connection structure
351 * @param n The number of bytes in the buffer
352 * @param b The buffer
353 */
354typedef void (*vws_tcp_svr_read)(vws_svr_cnx* c, ssize_t n, const uv_buf_t* b);
355
356/**
357 * @brief Callback for data processing within a worker thread
358 * @param s The server instance
359 * @param t The incoming request to process
360 */
361typedef void (*vws_tcp_svr_process_data)(vws_svr_data* t, void* data);
362
363/**
364 * @brief Enumerates server states
365 */
366typedef enum
367{
368 /**< Server is running */
369 VS_RUNNING = 0,
370
371 /**< Server is shutting down */
372 VS_HALTING = 1,
373
374 /**< Server is not running */
375 VS_HALTED = 2,
376
377} vws_tcp_svr_state_t;
378
379/** Abbreviation for the connection map */
380typedef struct sc_map_64v vws_svr_cnx_map;
381
382/**
383 * @brief Struct representing a basic server. It does not do anything but
384 * process raw data. It does not have any knowledge of WebSockets.
385 */
386typedef struct vws_tcp_svr
387{
388 /**< Current state of the server */
389 uint8_t state;
390
391 /**< Asynchronous handle for event-based programming */
392 uv_async_t* wakeup;
393
394 /**< Event loop handle */
395 uv_loop_t* loop;
396
397 /**< Request queue */
398 vws_svr_queue requests;
399
400 /**< Response queue */
401 vws_svr_queue responses;
402
403 /**< Maximum connections allowed */
404 int backlog;
405
406 /**< Number of threads in the worker pool */
407 int pool_size;
408
409 /**< Thread handles */
410 uv_thread_t* threads;
411
412 /**< Pool of active connections */
413 address_pool* cpool;
414
415 /**< Callback function for connect */
416 vws_tcp_svr_connect on_connect;
417
418 /**< Callback function for disconnect */
419 vws_tcp_svr_disconnect on_disconnect;
420
421 /**< Callback function for reading incoming data */
422 vws_tcp_svr_read on_read;
423
424 /**< Function for processing data from the client */
425 vws_tcp_svr_process_data on_data_in;
426
427 /**< Function for processing data from the worker back to the client */
428 vws_tcp_svr_process_data on_data_out;
429
430 /**< Worker thread constructor */
431 vws_thread_ctx_ctor worker_ctor;
432
433 /**< Worker thread constructor data */
434 void* worker_ctor_data;
435
436 /**< Worker thread destructor */
437 vws_thread_ctx_dtor worker_dtor;
438
439 /**< Tracing level (0 is off) */
440 uint8_t trace;
441
442 /**< inetd mode (default 0). vws_tcp_svr_inetd_run() sets it to 1. */
443 uint8_t inetd_mode;
444
445} vws_tcp_svr;
446
447/**
448 * @brief Creates a new thread data. This TAKES OWNERSHIP of the buffer data and
449 * sets the buffer to zero.
450 *
451 * @param s The server
452 * @param cid The connection ID
453 * @param b Pointer reference to The buffer
454 * @return A new vws_svr_data instance with memory
455 */
456vws_svr_data* vws_svr_data_new(vws_tcp_svr* s, vws_cid_t cid, vws_buffer** b);
457
458/**
459 * @brief Creates a new thread data. This TAKES OWNERSHIP of the data. The
460 * caller MUST NOT free() this data.
461 *
462 * @param s The server
463 * @param cid The connection ID
464 * @param size The number of bytes of data
465 * @param data The data
466 * @return A new vws_svr_data instance with memory
467 */
468vws_svr_data* vws_svr_data_own(vws_tcp_svr* s, vws_cid_t c, ucstr data, size_t size);
469
470/**
471 * @brief Frees the resources allocated to a thread data
472 *
473 * @param t The data
474 */
475void vws_svr_data_free(vws_svr_data* t);
476
477/**
478 * @brief Creates a new VRTQL server.
479 *
480 * @param pool_size The number of threads to run in the worker pool
481 * @param backlog The connection backlog for listen(). If this is set to 0, it
482 * will use the default (128).
483 * @param queue_size The maximum queue size for requests and responses. If this
484 * is set to 0, it will use the default (1024).
485 * @return A new VRTQL server.
486 */
487vws_tcp_svr* vws_tcp_svr_new(int pool_size, int backlog, int queue_size);
488
489/**
490 * @brief Frees the resources allocated to a VRTQL server.
491 *
492 * @param s The server to free.
493 */
494void vws_tcp_svr_free(vws_tcp_svr* s);
495
496/**
497 * @brief Starts a VRTQL server.
498 *
499 * @param server The server to run.
500 * @param host The host to bind the server.
501 * @param port The port to bind the server.
502 * @return 0 if successful, an error code otherwise.
503 */
504int vws_tcp_svr_run(vws_tcp_svr* server, cstr host, int port);
505
506/**
507 * @brief Starts a VRTQL server with a single open socket. This is designed to
508 * be used with tcpserver.
509 *
510 * @param server The server to run.
511 * @param sockfd The incoming socket
512 * @return 0 if successful, an error code otherwise.
513 */
514int vws_tcp_svr_inetd_run(vws_tcp_svr* server, int sockfd);
515
516/**
517 * @brief Stops a VRTQL server running in inetd_mode.
518 *
519 * @param server The server to stop.
520 */
521void vws_tcp_svr_inetd_stop(vws_tcp_svr* server);
522
523/**
524 * @brief Sends data from a VRTQL server.
525 *
526 * @param server The server to send the data.
527 * @param data The data to be sent.
528 * @return 0 if successful, an error code otherwise.
529 */
530int vws_tcp_svr_send(vws_svr_data* data);
531
532/**
533 * @brief Close a VRTQL server connection.
534 *
535 * @param cnx The server
536 * @param cnx The ID of connection to close
537 */
538void vws_tcp_svr_close(vws_tcp_svr* server, vws_cid_t cid);
539
540/**
541 * @brief Stops a VRTQL server.
542 *
543 * @param server The server to stop.
544 */
545void vws_tcp_svr_stop(vws_tcp_svr* server);
546
547/**
548 * @brief Returns the server operational state.
549 *
550 * @param server The server to check.
551 * @return The state of the server in the form of the vws_tcp_svr_state_t enum.
552 */
553uint8_t vws_tcp_svr_state(vws_tcp_svr* server);
554
555//------------------------------------------------------------------------------
556// WebSocket Server
557//------------------------------------------------------------------------------
558
559/**
560 * @brief Callback for data processing a WebSocket frame
561 * @param s The server instance
562 * @param f The incoming frame to process
563 */
564typedef void (*vws_svr_process_frame)(vws_svr_cnx* s, vws_frame* f);
565
566/**
567 * @brief Callback for data processing a WebSocket message
568 * @param s The server instance
569 * @param c The client connection index
570 * @param x The user-defined context
571 */
572typedef void
573(*vws_svr_process_msg)(struct vws_svr* s, vws_cid_t c, vws_msg* m, void* x);
574
575/**
576 * @brief Struct representing a WebSocket server. It speaks the WebSocket
577 * protocol and processes both WebSocket frames and messages.
578 */
579typedef struct vws_svr
580{
581 /**< Base class */
582 struct vws_tcp_svr base;
583
584 /**< Function for processing an incoming frame */
585 vws_svr_process_frame on_frame_in;
586
587 /**< Function for sending a frame to the client */
588 vws_svr_process_frame on_frame_out;
589
590 /**< Function for processing an incoming message */
591 vws_svr_process_msg on_msg_in;
592
593 /**< Function for sending a message to the client */
594 vws_svr_process_msg on_msg_out;
595
596 /**< Derived: for processing incoming messages (called by on_msg_in()) */
597 vws_svr_process_msg process;
598
599 /**< Derived: for sending messages to the client (calls on_msg_out()) */
600 vws_svr_process_msg send;
601
602} vws_svr;
603
604/**
605 * @brief Creates a new WebSocket server.
606 *
607 * @param pool_size The number of threads to run in the worker pool
608 * @param backlog The connection backlog for listen(). If this is set to 0, it
609 * will use the default (128).
610 * @param queue_size The maximum queue size for requests and responses. If this
611 * is set to 0, it will use the default (1024).
612 * @return A new WebSocket server.
613 */
614vws_svr* vws_svr_new(int pool_size, int backlog, int queue_size);
615
616/**
617 * @brief Frees the resources allocated to a WebSocket server.
618 *
619 * @param s The server to free.
620 */
621void vws_svr_free(vws_svr* s);
622
623/**
624 * @brief Starts a WebSocket server.
625 *
626 * @param server The server to run.
627 * @param host The host to bind the server.
628 * @param port The port to bind the server.
629 * @return 0 if successful, an error code otherwise.
630 */
631int vws_svr_run(vws_svr* server, cstr host, int port);
632
633//------------------------------------------------------------------------------
634// Messaging Server
635//------------------------------------------------------------------------------
636
637/**
638 * @brief Callback for data processing a message
639 * @param s The server instance
640 * @param c The client connection index
641 * @param m The incoming message to process
642 * @param x The user-defined context
643 */
644typedef void
645(*vrtql_svr_process_msg)(vws_svr* s, vws_cid_t c, vrtql_msg* m, void* x);
646
647/**
648 * @brief Struct representing a VTQL message server. It is derived from the
649 * WebSocket server and operates in terms of VRTQL messages (vrtql_msg) as
650 * defined in message.h.
651 */
652typedef struct vrtql_msg_svr
653{
654 /**< Base class */
655 struct vws_svr base;
656
657 /**< Function for processing an incoming message */
658 vrtql_svr_process_msg on_msg_in;
659
660 /**< Function for sending a message to the client */
661 vrtql_svr_process_msg on_msg_out;
662
663 /**< Derived: for processing incoming messages (called by on_msg_in()) */
664 vrtql_svr_process_msg process;
665
666 /**< Derived: for sending messages to the client (calls on_msg_out()) */
667 vrtql_svr_process_msg send;
668
669 /**< User-defined data */
670 void* data;
671
672} vrtql_msg_svr;
673
674/**
675 * @brief Creates a new VRTQL message server.
676 *
677 * @param pool_size The number of threads to run in the worker pool
678 * @param backlog The connection backlog for listen(). If this is set to 0, it
679 * will use the default (128).
680 * @param queue_size The maximum queue size for requests and responses. If this
681 * is set to 0, it will use the default (1024).
682 * @return A new VRTQL message server.
683 */
684vrtql_msg_svr* vrtql_msg_svr_new(int pool_size, int backlog, int queue_size);
685
686/**
687 * @brief Frees the resources allocated to a VRTQL message server.
688 *
689 * @param s The server to free.
690 */
691void vrtql_msg_svr_free(vrtql_msg_svr* s);
692
693/**
694 * @brief Message server instance constructor
695 *
696 * Constructs a new message server instance. This takes a new, empty
697 * vrtql_msg_svr instance and initializes all of its members. It is used by
698 * derived structs as well (vrtql_msg_svr) to construct the base struct.
699 *
700 * @param server The server instance to be initialized
701 * @return The initialized server instance
702 *
703 * @ingroup ServerFunctions
704 */
705
706vrtql_msg_svr* vrtql_msg_svr_ctor( vrtql_msg_svr* server,
707 int num_threads,
708 int backlog,
709 int queue_size );
710
711/**
712 * @brief Message server instance destructor
713 *
714 * Destructs an initialized message server instance. This takes a vrtql_msg_svr
715 * instance and deallocates all of its members -- everything but the top-level
716 * struct. This is used by derived structs as well to destruct the base struct.
717 *
718 * @param server The message server instance to be destructed
719 *
720 * @ingroup ServerFunctions
721 */
722
723void vrtql_msg_svr_dtor(vrtql_msg_svr* s);
724
725/**
726 * @brief Starts a VRTQL message server.
727 *
728 * @param server The server to run.
729 * @param host The host to bind the server.
730 * @param port The port to bind the server.
731 * @return 0 if successful, an error code otherwise.
732 */
733int vrtql_msg_svr_run(vrtql_msg_svr* server, cstr host, int port);
734
735#ifdef __cplusplus
736}
737#endif
738
739#endif /* VRTQL_SVR_DECLARE */
740