1#include <string.h>
2#include "mpack-expect.h"
3#include "mpack-reader.h"
4#include "mpack-writer.h"
5#include "util/yyjson.h"
6#include "message.h"
7
8//------------------------------------------------------------------------------
9// Utility function declarations
10//------------------------------------------------------------------------------
11
12/**
13 * @brief Parses a map from a MessagePack reader.
14 *
15 * The function reads from the reader and populates the provided map.
16 *
17 * @param reader The MessagePack reader from which to parse the map.
18 * @param map The map to populate with parsed key-value pairs.
19 * @return True if the parsing was successful, false otherwise.
20 */
21static bool msg_parse_map(mpack_reader_t* reader, vws_kvs* map);
22
23/**
24 * @brief Parses content from a MessagePack reader into a buffer.
25 *
26 * The function reads from the reader and appends the data into the buffer.
27 *
28 * @param reader The MessagePack reader from which to parse the content.
29 * @param buffer The buffer to which the parsed content will be appended.
30 * @return An integer indicating the status of the operation.
31 */
32static int32_t msg_parse_content(mpack_reader_t* reader, vws_buffer* buffer);
33
34//------------------------------------------------------------------------------
35// API functions
36//------------------------------------------------------------------------------
37
38vrtql_msg* vrtql_msg_new()
39{
40 vrtql_msg* msg = vws.calloc(1, sizeof(vrtql_msg));
41
42 msg->routing = vws_kvs_new(10);
43 msg->headers = vws_kvs_new(10);
44
45 msg->content = vws_buffer_new();
46 msg->flags = 0;
47 msg->format = VM_MPACK_FORMAT;
48 msg->data = NULL;
49
50 vws_set_flag(&msg->flags, VM_MSG_VALID);
51
52 return msg;
53}
54
55void vrtql_msg_free(vrtql_msg* msg)
56{
57 // Safety measure to prevent double freeing.
58 if (msg == NULL)
59 {
60 return;
61 }
62
63 vws_kvs_free(msg->routing);
64 vws_kvs_free(msg->headers);
65
66 vws_buffer_free(msg->content);
67
68 vws.free(msg);
69 msg = NULL;
70}
71
72vws_buffer* vrtql_msg_serialize(vrtql_msg* msg)
73{
74 if (msg == NULL)
75 {
76 return false;
77 }
78
79 if (msg->format == VM_MPACK_FORMAT)
80 {
81 // Serialize MessagePack
82
83 // Buffer to hold data
84 vws_buffer* buffer = vws_buffer_new();
85
86 // Initialize writer
87
88 cstr key; cstr value;
89 mpack_writer_t writer;
90 mpack_writer_init_growable(&writer, (char**)&buffer->data, &buffer->size);
91
92 // Binary is an array of 3 elements: routing, headers, content.
93 mpack_start_array(&writer, 3);
94
95 // Generate routing
96
97 mpack_build_map(&writer);
98 for (size_t i = 0; i < msg->routing->used; i++)
99 {
100 mpack_write_cstr(&writer, msg->routing->array[i].key);
101 mpack_write_cstr(&writer, msg->routing->array[i].value.data);
102 }
103 mpack_complete_map(&writer);
104
105 // Generate headers
106
107 mpack_build_map(&writer);
108
109 for (size_t i = 0; i < msg->headers->used; i++)
110 {
111 mpack_write_cstr(&writer, msg->headers->array[i].key);
112 mpack_write_cstr(&writer, msg->headers->array[i].value.data);
113 }
114 mpack_complete_map(&writer);
115
116 // Create content
117
118 int size = msg->content->size;
119 cstr data = (cstr)msg->content->data;
120 mpack_write_bin(&writer, data, size);
121
122 // Close array
123 mpack_finish_array(&writer);
124
125 // Cleanup
126
127 mpack_error_t rc = mpack_writer_destroy(&writer);
128
129 if (rc != mpack_ok)
130 {
131 char buf[256];
132 cstr text = mpack_error_to_string(rc);
133 snprintf(buf, sizeof(buf), "Encoding errror: %s", text);
134 vws.error(VE_RT, buf);
135
136 vws_buffer_free(buffer);
137 return NULL;
138 }
139
140 return buffer;
141 }
142
143 if (msg->format == VM_JSON_FORMAT)
144 {
145 // Serialize JSON
146
147 // Buffer to hold data
148 vws_buffer* buffer = vws_buffer_new();
149
150 // Create a mutable doc
151 yyjson_mut_doc* doc = yyjson_mut_doc_new(NULL);
152
153 // We're generating an array as root.
154 yyjson_mut_val* root = yyjson_mut_arr(doc);
155
156 // Set root of the doc
157 yyjson_mut_doc_set_root(doc, root);
158
159 // Generate routing
160 yyjson_mut_val* routing = yyjson_mut_arr_add_obj(doc, root);
161 cstr key; cstr value;
162 for (size_t i = 0; i < msg->routing->used; i++)
163 {
164 yyjson_mut_obj_add_str( doc,
165 routing,
166 msg->routing->array[i].key,
167 msg->routing->array[i].value.data );
168 }
169
170 // Generate headers
171 yyjson_mut_val* headers = yyjson_mut_arr_add_obj(doc, root);
172
173 for (size_t i = 0; i < msg->routing->used; i++)
174 {
175 yyjson_mut_obj_add_str( doc,
176 routing,
177 msg->headers->array[i].key,
178 msg->headers->array[i].value.data );
179 }
180
181 // Add content
182 int size = msg->content->size;
183 cstr data = (cstr)msg->content->data;
184
185 if (size > 0)
186 {
187 yyjson_mut_arr_add_strncpy(doc, root, data, size);
188 }
189
190 // To string, minified
191 cstr json = yyjson_mut_write(doc, 0, NULL);
192
193 if (json)
194 {
195 // Assuming vws_buffer_write takes a char* and size, writes the
196 // data to the buffer
197 vws_buffer_append(buffer, (ucstr)json, strlen(json));
198 vws.free((void*)json);
199 }
200
201 // Free the doc
202 yyjson_mut_doc_free(doc);
203
204 return buffer;
205 }
206
207 return NULL;
208}
209
210bool vrtql_msg_deserialize(vrtql_msg* msg, ucstr data, size_t length)
211{
212 if ((data == NULL) || (length == 0))
213 {
214 return false;
215 }
216
217 // Parse message based on exptected format
218 //
219 // A message can be serialized into both JSON and MessagePack on the wire
220 // within the same connection. That is, the protocol supports auto-detection
221 // of JSON or MessagePack. The binary MessagePack format conveniently starts
222 // with an invalid UTF-8 character which works as a magic number to identify
223 // the binary message in the stream. This way a single connection can use
224 // both JSON and binary MessageMack messages in same stream. The presence of
225 // magic number signifies binary format and is therefore deserialized using
226 // MessagePack. Absent that it parses UTF-8/JSON until encountering a NULL
227 // terminator.
228 //
229 // The magic number comes from MessagePack binary format which is based on
230 // the number of arguments (n) to be serialized. We are a format consisting
231 // of an array with three elements: routing, headers, content. This means
232 // the value fori n is 3. Therefore the MessagePack format for this object
233 // would start with the value (0x90u | 3). This is our magic number which we
234 // use to check if the data is MessagePack. Everything else is assumed to be
235 // JSON.
236
237 unsigned char magic_number = (unsigned char)(0x90u | 3);
238 unsigned char first_byte = (unsigned char)data[0];
239
240 if (first_byte == magic_number)
241 {
242 // Deserialize MessagePack
243
244 // Initialize reader
245 mpack_reader_t reader;
246 mpack_reader_init_data(&reader, (cstr)data, length);
247
248 // Expect an array of size 3
249 if (mpack_expect_array(&reader) != 3)
250 {
251 vws.error(VE_RT, "Invalid MessagePack format");
252 return false;
253 }
254
255 // Parse routing map
256
257 if (msg_parse_map(&reader, msg->routing) == false)
258 {
259 return false;
260 }
261
262 // Parse header map
263
264 if (msg_parse_map(&reader, msg->headers) == false)
265 {
266 return false;
267 }
268
269 // Parse content
270
271 ssize_t n = msg_parse_content(&reader, msg->content);
272
273 if (n < 0)
274 {
275 return false;
276 }
277 else
278 {
279 msg->content->size = n;
280 }
281
282 // Finish reading the array
283 mpack_done_array(&reader);
284
285 // Cleanup
286
287 mpack_error_t rc = mpack_reader_destroy(&reader);
288
289 if (rc != mpack_ok)
290 {
291 char buf[256];
292 cstr text = mpack_error_to_string(rc);
293 snprintf(buf, sizeof(buf), "Decoding errror: %s", text);
294 vws.error(VE_RT, buf);
295
296 return false;
297 }
298
299 // Record format
300 msg->format = VM_MPACK_FORMAT;
301 }
302 else
303 {
304 // Deserialize JSON
305
306 yyjson_read_flag flags = YYJSON_READ_NOFLAG;
307 yyjson_doc* doc = yyjson_read((cstr)data, length, flags);
308 yyjson_val* root = yyjson_doc_get_root(doc);
309
310 if (!yyjson_is_arr(root) || yyjson_arr_size(root) != 3)
311 {
312 vws.error(VE_RT, "Invalid JSON: Root is not an array of size 3");
313 yyjson_doc_free(doc);
314
315 return false;
316 }
317
318 yyjson_val* routing = yyjson_arr_get(root, 0);
319 yyjson_val* headers = yyjson_arr_get(root, 1);
320 yyjson_val* content = yyjson_arr_get(root, 2);
321
322 yyjson_val* key;
323 yyjson_val* value;
324 yyjson_obj_iter iter;
325
326 if (routing && yyjson_is_obj(routing))
327 {
328 yyjson_obj_iter_init(routing, &iter);
329
330 while ((key = yyjson_obj_iter_next(&iter)))
331 {
332 value = yyjson_obj_iter_get_val(key);
333 vws_kvs_set_cstring( msg->routing,
334 yyjson_get_str(key),
335 yyjson_get_str(value) );
336 }
337 }
338 else
339 {
340 vws.error(VE_RT, "Invalid JSON: routing not JSON object");
341 yyjson_doc_free(doc);
342
343 return false;
344 }
345
346 if (headers && yyjson_is_obj(headers))
347 {
348 yyjson_obj_iter_init(headers, &iter);
349 while ((key = yyjson_obj_iter_next(&iter)))
350 {
351 value = yyjson_obj_iter_get_val(key);
352 vws_kvs_set_cstring( msg->headers,
353 yyjson_get_str(key),
354 yyjson_get_str(value) );
355 }
356 }
357 else
358 {
359 vws.error(VE_RT, "Invalid JSON: headers is not JSON object");
360 yyjson_doc_free(doc);
361
362 return false;
363 }
364
365 if (content && yyjson_is_str(content))
366 {
367 vrtql_msg_set_content(msg, yyjson_get_str(content));
368 }
369 else
370 {
371 vws.error(VE_RT, "Invalid JSON: content is not a string");
372 yyjson_doc_free(doc);
373
374 return false;
375 }
376
377 yyjson_doc_free(doc);
378
379 // Record format
380 msg->format = VM_JSON_FORMAT;
381 }
382
383 return true;
384}
385
386void vrtql_msg_dump(vrtql_msg* msg)
387{
388 // Buffer to hold data
389 vws_buffer* buffer = vws_buffer_new();
390
391 // Create a mutable doc
392 yyjson_mut_doc* doc = yyjson_mut_doc_new(NULL);
393
394 // We're generating an array as root.
395 yyjson_mut_val* root = yyjson_mut_arr(doc);
396
397 // Set root of the doc
398 yyjson_mut_doc_set_root(doc, root);
399
400 // Generate routing
401 yyjson_mut_val* routing = yyjson_mut_arr_add_obj(doc, root);
402 cstr key; cstr value;
403
404 for (size_t i = 0; i < msg->routing->used; i++)
405 {
406 yyjson_mut_obj_add_str( doc,
407 routing,
408 msg->routing->array[i].key,
409 msg->routing->array[i].value.data );
410 }
411
412 // Generate headers
413 yyjson_mut_val* headers = yyjson_mut_arr_add_obj(doc, root);
414 for (size_t i = 0; i < msg->headers->used; i++)
415 {
416 yyjson_mut_obj_add_str( doc,
417 headers,
418 msg->headers->array[i].key,
419 msg->headers->array[i].value.data );
420 }
421
422 // To string, minified
423 cstr json = yyjson_mut_write(doc, 0, NULL);
424
425 if (json)
426 {
427 // Assuming vws_buffer_write takes a char* and size, writes the
428 // data to the buffer
429 vws_buffer_append(buffer, (ucstr)json, strlen(json));
430 vws.free((void*)json);
431 }
432
433 // Free the doc
434 yyjson_mut_doc_free(doc);
435
436 printf("%s\n", buffer->data);
437
438 if (msg->content->size > 0)
439 {
440 printf("%.*s\n", (int)msg->content->size, msg->content->data);
441 }
442
443 vws_buffer_free(buffer);
444}
445
446cstr vrtql_msg_get_header(vrtql_msg* msg, cstr key)
447{
448 return vws_kvs_get_cstring(msg->headers, key);
449}
450
451void vrtql_msg_set_header(vrtql_msg* msg, cstr key, cstr value)
452{
453 vws_kvs_set_cstring(msg->headers, key, value);
454}
455
456void vrtql_msg_clear_header(vrtql_msg* msg, cstr key)
457{
458 vws_kvs_remove(msg->headers, key);
459}
460
461cstr vrtql_msg_get_routing(vrtql_msg* msg, cstr key)
462{
463 return vws_kvs_get_cstring(msg->routing, key);
464}
465
466void vrtql_msg_set_routing(vrtql_msg* msg, cstr key, cstr value)
467{
468 vws_kvs_set_cstring(msg->routing, key, value);
469}
470
471void vrtql_msg_clear_routing(vrtql_msg* msg, cstr key)
472{
473 vws_kvs_remove(msg->routing, key);
474}
475
476void vrtql_msg_clear_content(vrtql_msg* msg)
477{
478 vws_buffer_clear(msg->content);
479}
480
481cstr vrtql_msg_get_content(vrtql_msg* msg)
482{
483 return (cstr)msg->content->data;
484}
485
486size_t vrtql_msg_get_content_size(vrtql_msg* msg)
487{
488 return msg->content->size;
489}
490
491void vrtql_msg_set_content(vrtql_msg* msg, cstr value)
492{
493 vws_buffer_clear(msg->content);
494 vws_buffer_append(msg->content, (ucstr)value, strlen(value));
495}
496
497void vrtql_msg_set_content_binary(vrtql_msg* msg, cstr value, size_t size)
498{
499 vws_buffer_clear(msg->content);
500 vws_buffer_append(msg->content, (ucstr)value, size);
501}
502
503ssize_t vrtql_msg_send(vws_cnx* c, vrtql_msg* msg)
504{
505 vws_buffer* binary = vrtql_msg_serialize(msg);
506 ssize_t bytes = vws_frame_send_binary(c, binary->data, binary->size);
507 vws_buffer_free(binary);
508
509 return bytes;
510}
511
512vrtql_msg* vrtql_msg_recv(vws_cnx* c)
513{
514 vws_msg* wsm = vws_msg_recv(c);
515
516 if (wsm == NULL)
517 {
518 return NULL;
519 }
520
521 // Deserialize VRTQL message
522 vrtql_msg* m = vrtql_msg_new();
523 ucstr data = wsm->data->data;
524 size_t size = wsm->data->size;
525 if (vrtql_msg_deserialize(m, data, size) == false)
526 {
527 // Error already set
528 vws_msg_free(wsm);
529 vrtql_msg_free(m);
530 return NULL;
531 }
532
533 vws_msg_free(wsm);
534
535 return m;
536}
537
538//------------------------------------------------------------------------------
539// Utility functions
540//------------------------------------------------------------------------------
541
542bool msg_parse_map(mpack_reader_t* reader, vws_kvs* map)
543{
544 // Clear contents
545 vws_kvs_clear(map);
546
547 mpack_tag_t tag = mpack_read_tag(reader);
548
549 if (mpack_reader_error(reader) != mpack_ok)
550 {
551 return false;
552 }
553
554 if (mpack_tag_type(&tag) != mpack_type_map)
555 {
556 return false;
557 }
558
559 char* key;
560 char* value;
561
562 uint32_t count = mpack_tag_map_count(&tag);
563
564 while (count-- > 0)
565 {
566 uint32_t length;
567 cstr data;
568 mpack_tag_t tag;
569
570 //> Get key
571
572 tag = mpack_read_tag(reader);
573
574 if (mpack_tag_type(&tag) != mpack_type_str)
575 {
576 printf("ERROR: key must be string\n");
577 return false;
578 }
579
580 length = mpack_tag_str_length(&tag);
581 data = mpack_read_bytes_inplace(reader, length);
582 key = vws.malloc(length + 1);
583
584 memcpy(key, data, length);
585 key[length] = 0;
586 mpack_done_str(reader);
587
588 //> Get value
589
590 tag = mpack_read_tag(reader);
591
592 if (mpack_tag_type(&tag) != mpack_type_str)
593 {
594 printf("ERROR: value must be string\n");
595 return false;
596 }
597
598 length = mpack_tag_str_length(&tag);
599 data = mpack_read_bytes_inplace(reader, length);
600 value = vws.malloc(length + 1);
601
602 memcpy(value, data, length);
603 value[length] = 0;
604 mpack_done_str(reader);
605
606 vws_kvs_set_cstring(map, key, value);
607
608 vws.free(key);
609 vws.free(value);
610
611 if (mpack_reader_error(reader) != mpack_ok)
612 {
613 return false;
614 }
615 }
616
617 mpack_done_map(reader);
618
619 return true;
620}
621
622int32_t msg_parse_content(mpack_reader_t* reader, vws_buffer* buffer)
623{
624 mpack_tag_t tag = mpack_read_tag(reader);
625
626 if (mpack_reader_error(reader) != mpack_ok)
627 {
628 return -1;
629 }
630
631 mpack_type_t type = mpack_tag_type(&tag);
632
633 if ((type != mpack_type_bin) && (type != mpack_type_str))
634 {
635 return -1;
636 }
637
638 uint32_t length = mpack_tag_str_length(&tag);
639
640 if (length == 0)
641 {
642 return 0;
643 }
644
645 cstr data = mpack_read_bytes_inplace(reader, length);
646
647 vws_buffer_clear(buffer);
648 vws_buffer_append(buffer, (ucstr)data, length);
649 mpack_done_str(reader);
650
651 return length;
652}
653
654