1 | #include <string.h> |
2 | #include "mpack-expect.h" |
3 | #include "mpack-reader.h" |
4 | #include "mpack-writer.h" |
5 | #include "util/yyjson.h" |
6 | #include "message.h" |
7 | |
8 | //------------------------------------------------------------------------------ |
9 | // Utility function declarations |
10 | //------------------------------------------------------------------------------ |
11 | |
12 | /** |
13 | * @brief Parses a map from a MessagePack reader. |
14 | * |
15 | * The function reads from the reader and populates the provided map. |
16 | * |
17 | * @param reader The MessagePack reader from which to parse the map. |
18 | * @param map The map to populate with parsed key-value pairs. |
19 | * @return True if the parsing was successful, false otherwise. |
20 | */ |
21 | static bool msg_parse_map(mpack_reader_t* reader, vws_kvs* map); |
22 | |
23 | /** |
24 | * @brief Parses content from a MessagePack reader into a buffer. |
25 | * |
26 | * The function reads from the reader and appends the data into the buffer. |
27 | * |
28 | * @param reader The MessagePack reader from which to parse the content. |
29 | * @param buffer The buffer to which the parsed content will be appended. |
30 | * @return An integer indicating the status of the operation. |
31 | */ |
32 | static int32_t msg_parse_content(mpack_reader_t* reader, vws_buffer* buffer); |
33 | |
34 | //------------------------------------------------------------------------------ |
35 | // API functions |
36 | //------------------------------------------------------------------------------ |
37 | |
38 | vrtql_msg* vrtql_msg_new() |
39 | { |
40 | vrtql_msg* msg = vws.calloc(1, sizeof(vrtql_msg)); |
41 | |
42 | msg->routing = vws_kvs_new(10); |
43 | msg->headers = vws_kvs_new(10); |
44 | |
45 | msg->content = vws_buffer_new(); |
46 | msg->flags = 0; |
47 | msg->format = VM_MPACK_FORMAT; |
48 | msg->data = NULL; |
49 | |
50 | vws_set_flag(&msg->flags, VM_MSG_VALID); |
51 | |
52 | return msg; |
53 | } |
54 | |
55 | void vrtql_msg_free(vrtql_msg* msg) |
56 | { |
57 | // Safety measure to prevent double freeing. |
58 | if (msg == NULL) |
59 | { |
60 | return; |
61 | } |
62 | |
63 | vws_kvs_free(msg->routing); |
64 | vws_kvs_free(msg->headers); |
65 | |
66 | vws_buffer_free(msg->content); |
67 | |
68 | vws.free(msg); |
69 | msg = NULL; |
70 | } |
71 | |
72 | vws_buffer* vrtql_msg_serialize(vrtql_msg* msg) |
73 | { |
74 | if (msg == NULL) |
75 | { |
76 | return false; |
77 | } |
78 | |
79 | if (msg->format == VM_MPACK_FORMAT) |
80 | { |
81 | // Serialize MessagePack |
82 | |
83 | // Buffer to hold data |
84 | vws_buffer* buffer = vws_buffer_new(); |
85 | |
86 | // Initialize writer |
87 | |
88 | cstr key; cstr value; |
89 | mpack_writer_t writer; |
90 | mpack_writer_init_growable(&writer, (char**)&buffer->data, &buffer->size); |
91 | |
92 | // Binary is an array of 3 elements: routing, headers, content. |
93 | mpack_start_array(&writer, 3); |
94 | |
95 | // Generate routing |
96 | |
97 | mpack_build_map(&writer); |
98 | for (size_t i = 0; i < msg->routing->used; i++) |
99 | { |
100 | mpack_write_cstr(&writer, msg->routing->array[i].key); |
101 | mpack_write_cstr(&writer, msg->routing->array[i].value.data); |
102 | } |
103 | mpack_complete_map(&writer); |
104 | |
105 | // Generate headers |
106 | |
107 | mpack_build_map(&writer); |
108 | |
109 | for (size_t i = 0; i < msg->headers->used; i++) |
110 | { |
111 | mpack_write_cstr(&writer, msg->headers->array[i].key); |
112 | mpack_write_cstr(&writer, msg->headers->array[i].value.data); |
113 | } |
114 | mpack_complete_map(&writer); |
115 | |
116 | // Create content |
117 | |
118 | int size = msg->content->size; |
119 | cstr data = (cstr)msg->content->data; |
120 | mpack_write_bin(&writer, data, size); |
121 | |
122 | // Close array |
123 | mpack_finish_array(&writer); |
124 | |
125 | // Cleanup |
126 | |
127 | mpack_error_t rc = mpack_writer_destroy(&writer); |
128 | |
129 | if (rc != mpack_ok) |
130 | { |
131 | char buf[256]; |
132 | cstr text = mpack_error_to_string(rc); |
133 | snprintf(buf, sizeof(buf), "Encoding errror: %s" , text); |
134 | vws.error(VE_RT, buf); |
135 | |
136 | vws_buffer_free(buffer); |
137 | return NULL; |
138 | } |
139 | |
140 | return buffer; |
141 | } |
142 | |
143 | if (msg->format == VM_JSON_FORMAT) |
144 | { |
145 | // Serialize JSON |
146 | |
147 | // Buffer to hold data |
148 | vws_buffer* buffer = vws_buffer_new(); |
149 | |
150 | // Create a mutable doc |
151 | yyjson_mut_doc* doc = yyjson_mut_doc_new(NULL); |
152 | |
153 | // We're generating an array as root. |
154 | yyjson_mut_val* root = yyjson_mut_arr(doc); |
155 | |
156 | // Set root of the doc |
157 | yyjson_mut_doc_set_root(doc, root); |
158 | |
159 | // Generate routing |
160 | yyjson_mut_val* routing = yyjson_mut_arr_add_obj(doc, root); |
161 | cstr key; cstr value; |
162 | for (size_t i = 0; i < msg->routing->used; i++) |
163 | { |
164 | yyjson_mut_obj_add_str( doc, |
165 | routing, |
166 | msg->routing->array[i].key, |
167 | msg->routing->array[i].value.data ); |
168 | } |
169 | |
170 | // Generate headers |
171 | yyjson_mut_val* = yyjson_mut_arr_add_obj(doc, root); |
172 | |
173 | for (size_t i = 0; i < msg->routing->used; i++) |
174 | { |
175 | yyjson_mut_obj_add_str( doc, |
176 | routing, |
177 | msg->headers->array[i].key, |
178 | msg->headers->array[i].value.data ); |
179 | } |
180 | |
181 | // Add content |
182 | int size = msg->content->size; |
183 | cstr data = (cstr)msg->content->data; |
184 | |
185 | if (size > 0) |
186 | { |
187 | yyjson_mut_arr_add_strncpy(doc, root, data, size); |
188 | } |
189 | |
190 | // To string, minified |
191 | cstr json = yyjson_mut_write(doc, 0, NULL); |
192 | |
193 | if (json) |
194 | { |
195 | // Assuming vws_buffer_write takes a char* and size, writes the |
196 | // data to the buffer |
197 | vws_buffer_append(buffer, (ucstr)json, strlen(json)); |
198 | vws.free((void*)json); |
199 | } |
200 | |
201 | // Free the doc |
202 | yyjson_mut_doc_free(doc); |
203 | |
204 | return buffer; |
205 | } |
206 | |
207 | return NULL; |
208 | } |
209 | |
210 | bool vrtql_msg_deserialize(vrtql_msg* msg, ucstr data, size_t length) |
211 | { |
212 | if ((data == NULL) || (length == 0)) |
213 | { |
214 | return false; |
215 | } |
216 | |
217 | // Parse message based on exptected format |
218 | // |
219 | // A message can be serialized into both JSON and MessagePack on the wire |
220 | // within the same connection. That is, the protocol supports auto-detection |
221 | // of JSON or MessagePack. The binary MessagePack format conveniently starts |
222 | // with an invalid UTF-8 character which works as a magic number to identify |
223 | // the binary message in the stream. This way a single connection can use |
224 | // both JSON and binary MessageMack messages in same stream. The presence of |
225 | // magic number signifies binary format and is therefore deserialized using |
226 | // MessagePack. Absent that it parses UTF-8/JSON until encountering a NULL |
227 | // terminator. |
228 | // |
229 | // The magic number comes from MessagePack binary format which is based on |
230 | // the number of arguments (n) to be serialized. We are a format consisting |
231 | // of an array with three elements: routing, headers, content. This means |
232 | // the value fori n is 3. Therefore the MessagePack format for this object |
233 | // would start with the value (0x90u | 3). This is our magic number which we |
234 | // use to check if the data is MessagePack. Everything else is assumed to be |
235 | // JSON. |
236 | |
237 | unsigned char magic_number = (unsigned char)(0x90u | 3); |
238 | unsigned char first_byte = (unsigned char)data[0]; |
239 | |
240 | if (first_byte == magic_number) |
241 | { |
242 | // Deserialize MessagePack |
243 | |
244 | // Initialize reader |
245 | mpack_reader_t reader; |
246 | mpack_reader_init_data(&reader, (cstr)data, length); |
247 | |
248 | // Expect an array of size 3 |
249 | if (mpack_expect_array(&reader) != 3) |
250 | { |
251 | vws.error(VE_RT, "Invalid MessagePack format" ); |
252 | return false; |
253 | } |
254 | |
255 | // Parse routing map |
256 | |
257 | if (msg_parse_map(&reader, msg->routing) == false) |
258 | { |
259 | return false; |
260 | } |
261 | |
262 | // Parse header map |
263 | |
264 | if (msg_parse_map(&reader, msg->headers) == false) |
265 | { |
266 | return false; |
267 | } |
268 | |
269 | // Parse content |
270 | |
271 | ssize_t n = msg_parse_content(&reader, msg->content); |
272 | |
273 | if (n < 0) |
274 | { |
275 | return false; |
276 | } |
277 | else |
278 | { |
279 | msg->content->size = n; |
280 | } |
281 | |
282 | // Finish reading the array |
283 | mpack_done_array(&reader); |
284 | |
285 | // Cleanup |
286 | |
287 | mpack_error_t rc = mpack_reader_destroy(&reader); |
288 | |
289 | if (rc != mpack_ok) |
290 | { |
291 | char buf[256]; |
292 | cstr text = mpack_error_to_string(rc); |
293 | snprintf(buf, sizeof(buf), "Decoding errror: %s" , text); |
294 | vws.error(VE_RT, buf); |
295 | |
296 | return false; |
297 | } |
298 | |
299 | // Record format |
300 | msg->format = VM_MPACK_FORMAT; |
301 | } |
302 | else |
303 | { |
304 | // Deserialize JSON |
305 | |
306 | yyjson_read_flag flags = YYJSON_READ_NOFLAG; |
307 | yyjson_doc* doc = yyjson_read((cstr)data, length, flags); |
308 | yyjson_val* root = yyjson_doc_get_root(doc); |
309 | |
310 | if (!yyjson_is_arr(root) || yyjson_arr_size(root) != 3) |
311 | { |
312 | vws.error(VE_RT, "Invalid JSON: Root is not an array of size 3" ); |
313 | yyjson_doc_free(doc); |
314 | |
315 | return false; |
316 | } |
317 | |
318 | yyjson_val* routing = yyjson_arr_get(root, 0); |
319 | yyjson_val* = yyjson_arr_get(root, 1); |
320 | yyjson_val* content = yyjson_arr_get(root, 2); |
321 | |
322 | yyjson_val* key; |
323 | yyjson_val* value; |
324 | yyjson_obj_iter iter; |
325 | |
326 | if (routing && yyjson_is_obj(routing)) |
327 | { |
328 | yyjson_obj_iter_init(routing, &iter); |
329 | |
330 | while ((key = yyjson_obj_iter_next(&iter))) |
331 | { |
332 | value = yyjson_obj_iter_get_val(key); |
333 | vws_kvs_set_cstring( msg->routing, |
334 | yyjson_get_str(key), |
335 | yyjson_get_str(value) ); |
336 | } |
337 | } |
338 | else |
339 | { |
340 | vws.error(VE_RT, "Invalid JSON: routing not JSON object" ); |
341 | yyjson_doc_free(doc); |
342 | |
343 | return false; |
344 | } |
345 | |
346 | if (headers && yyjson_is_obj(headers)) |
347 | { |
348 | yyjson_obj_iter_init(headers, &iter); |
349 | while ((key = yyjson_obj_iter_next(&iter))) |
350 | { |
351 | value = yyjson_obj_iter_get_val(key); |
352 | vws_kvs_set_cstring( msg->headers, |
353 | yyjson_get_str(key), |
354 | yyjson_get_str(value) ); |
355 | } |
356 | } |
357 | else |
358 | { |
359 | vws.error(VE_RT, "Invalid JSON: headers is not JSON object" ); |
360 | yyjson_doc_free(doc); |
361 | |
362 | return false; |
363 | } |
364 | |
365 | if (content && yyjson_is_str(content)) |
366 | { |
367 | vrtql_msg_set_content(msg, yyjson_get_str(content)); |
368 | } |
369 | else |
370 | { |
371 | vws.error(VE_RT, "Invalid JSON: content is not a string" ); |
372 | yyjson_doc_free(doc); |
373 | |
374 | return false; |
375 | } |
376 | |
377 | yyjson_doc_free(doc); |
378 | |
379 | // Record format |
380 | msg->format = VM_JSON_FORMAT; |
381 | } |
382 | |
383 | return true; |
384 | } |
385 | |
386 | void vrtql_msg_dump(vrtql_msg* msg) |
387 | { |
388 | // Buffer to hold data |
389 | vws_buffer* buffer = vws_buffer_new(); |
390 | |
391 | // Create a mutable doc |
392 | yyjson_mut_doc* doc = yyjson_mut_doc_new(NULL); |
393 | |
394 | // We're generating an array as root. |
395 | yyjson_mut_val* root = yyjson_mut_arr(doc); |
396 | |
397 | // Set root of the doc |
398 | yyjson_mut_doc_set_root(doc, root); |
399 | |
400 | // Generate routing |
401 | yyjson_mut_val* routing = yyjson_mut_arr_add_obj(doc, root); |
402 | cstr key; cstr value; |
403 | |
404 | for (size_t i = 0; i < msg->routing->used; i++) |
405 | { |
406 | yyjson_mut_obj_add_str( doc, |
407 | routing, |
408 | msg->routing->array[i].key, |
409 | msg->routing->array[i].value.data ); |
410 | } |
411 | |
412 | // Generate headers |
413 | yyjson_mut_val* = yyjson_mut_arr_add_obj(doc, root); |
414 | for (size_t i = 0; i < msg->headers->used; i++) |
415 | { |
416 | yyjson_mut_obj_add_str( doc, |
417 | headers, |
418 | msg->headers->array[i].key, |
419 | msg->headers->array[i].value.data ); |
420 | } |
421 | |
422 | // To string, minified |
423 | cstr json = yyjson_mut_write(doc, 0, NULL); |
424 | |
425 | if (json) |
426 | { |
427 | // Assuming vws_buffer_write takes a char* and size, writes the |
428 | // data to the buffer |
429 | vws_buffer_append(buffer, (ucstr)json, strlen(json)); |
430 | vws.free((void*)json); |
431 | } |
432 | |
433 | // Free the doc |
434 | yyjson_mut_doc_free(doc); |
435 | |
436 | printf("%s\n" , buffer->data); |
437 | |
438 | if (msg->content->size > 0) |
439 | { |
440 | printf("%.*s\n" , (int)msg->content->size, msg->content->data); |
441 | } |
442 | |
443 | vws_buffer_free(buffer); |
444 | } |
445 | |
446 | cstr (vrtql_msg* msg, cstr key) |
447 | { |
448 | return vws_kvs_get_cstring(msg->headers, key); |
449 | } |
450 | |
451 | void (vrtql_msg* msg, cstr key, cstr value) |
452 | { |
453 | vws_kvs_set_cstring(msg->headers, key, value); |
454 | } |
455 | |
456 | void (vrtql_msg* msg, cstr key) |
457 | { |
458 | vws_kvs_remove(msg->headers, key); |
459 | } |
460 | |
461 | cstr vrtql_msg_get_routing(vrtql_msg* msg, cstr key) |
462 | { |
463 | return vws_kvs_get_cstring(msg->routing, key); |
464 | } |
465 | |
466 | void vrtql_msg_set_routing(vrtql_msg* msg, cstr key, cstr value) |
467 | { |
468 | vws_kvs_set_cstring(msg->routing, key, value); |
469 | } |
470 | |
471 | void vrtql_msg_clear_routing(vrtql_msg* msg, cstr key) |
472 | { |
473 | vws_kvs_remove(msg->routing, key); |
474 | } |
475 | |
476 | void vrtql_msg_clear_content(vrtql_msg* msg) |
477 | { |
478 | vws_buffer_clear(msg->content); |
479 | } |
480 | |
481 | cstr vrtql_msg_get_content(vrtql_msg* msg) |
482 | { |
483 | return (cstr)msg->content->data; |
484 | } |
485 | |
486 | size_t vrtql_msg_get_content_size(vrtql_msg* msg) |
487 | { |
488 | return msg->content->size; |
489 | } |
490 | |
491 | void vrtql_msg_set_content(vrtql_msg* msg, cstr value) |
492 | { |
493 | vws_buffer_clear(msg->content); |
494 | vws_buffer_append(msg->content, (ucstr)value, strlen(value)); |
495 | } |
496 | |
497 | void vrtql_msg_set_content_binary(vrtql_msg* msg, cstr value, size_t size) |
498 | { |
499 | vws_buffer_clear(msg->content); |
500 | vws_buffer_append(msg->content, (ucstr)value, size); |
501 | } |
502 | |
503 | ssize_t vrtql_msg_send(vws_cnx* c, vrtql_msg* msg) |
504 | { |
505 | vws_buffer* binary = vrtql_msg_serialize(msg); |
506 | ssize_t bytes = vws_frame_send_binary(c, binary->data, binary->size); |
507 | vws_buffer_free(binary); |
508 | |
509 | return bytes; |
510 | } |
511 | |
512 | vrtql_msg* vrtql_msg_recv(vws_cnx* c) |
513 | { |
514 | vws_msg* wsm = vws_msg_recv(c); |
515 | |
516 | if (wsm == NULL) |
517 | { |
518 | return NULL; |
519 | } |
520 | |
521 | // Deserialize VRTQL message |
522 | vrtql_msg* m = vrtql_msg_new(); |
523 | ucstr data = wsm->data->data; |
524 | size_t size = wsm->data->size; |
525 | if (vrtql_msg_deserialize(m, data, size) == false) |
526 | { |
527 | // Error already set |
528 | vws_msg_free(wsm); |
529 | vrtql_msg_free(m); |
530 | return NULL; |
531 | } |
532 | |
533 | vws_msg_free(wsm); |
534 | |
535 | return m; |
536 | } |
537 | |
538 | //------------------------------------------------------------------------------ |
539 | // Utility functions |
540 | //------------------------------------------------------------------------------ |
541 | |
542 | bool msg_parse_map(mpack_reader_t* reader, vws_kvs* map) |
543 | { |
544 | // Clear contents |
545 | vws_kvs_clear(map); |
546 | |
547 | mpack_tag_t tag = mpack_read_tag(reader); |
548 | |
549 | if (mpack_reader_error(reader) != mpack_ok) |
550 | { |
551 | return false; |
552 | } |
553 | |
554 | if (mpack_tag_type(&tag) != mpack_type_map) |
555 | { |
556 | return false; |
557 | } |
558 | |
559 | char* key; |
560 | char* value; |
561 | |
562 | uint32_t count = mpack_tag_map_count(&tag); |
563 | |
564 | while (count-- > 0) |
565 | { |
566 | uint32_t length; |
567 | cstr data; |
568 | mpack_tag_t tag; |
569 | |
570 | //> Get key |
571 | |
572 | tag = mpack_read_tag(reader); |
573 | |
574 | if (mpack_tag_type(&tag) != mpack_type_str) |
575 | { |
576 | printf("ERROR: key must be string\n" ); |
577 | return false; |
578 | } |
579 | |
580 | length = mpack_tag_str_length(&tag); |
581 | data = mpack_read_bytes_inplace(reader, length); |
582 | key = vws.malloc(length + 1); |
583 | |
584 | memcpy(key, data, length); |
585 | key[length] = 0; |
586 | mpack_done_str(reader); |
587 | |
588 | //> Get value |
589 | |
590 | tag = mpack_read_tag(reader); |
591 | |
592 | if (mpack_tag_type(&tag) != mpack_type_str) |
593 | { |
594 | printf("ERROR: value must be string\n" ); |
595 | return false; |
596 | } |
597 | |
598 | length = mpack_tag_str_length(&tag); |
599 | data = mpack_read_bytes_inplace(reader, length); |
600 | value = vws.malloc(length + 1); |
601 | |
602 | memcpy(value, data, length); |
603 | value[length] = 0; |
604 | mpack_done_str(reader); |
605 | |
606 | vws_kvs_set_cstring(map, key, value); |
607 | |
608 | vws.free(key); |
609 | vws.free(value); |
610 | |
611 | if (mpack_reader_error(reader) != mpack_ok) |
612 | { |
613 | return false; |
614 | } |
615 | } |
616 | |
617 | mpack_done_map(reader); |
618 | |
619 | return true; |
620 | } |
621 | |
622 | int32_t msg_parse_content(mpack_reader_t* reader, vws_buffer* buffer) |
623 | { |
624 | mpack_tag_t tag = mpack_read_tag(reader); |
625 | |
626 | if (mpack_reader_error(reader) != mpack_ok) |
627 | { |
628 | return -1; |
629 | } |
630 | |
631 | mpack_type_t type = mpack_tag_type(&tag); |
632 | |
633 | if ((type != mpack_type_bin) && (type != mpack_type_str)) |
634 | { |
635 | return -1; |
636 | } |
637 | |
638 | uint32_t length = mpack_tag_str_length(&tag); |
639 | |
640 | if (length == 0) |
641 | { |
642 | return 0; |
643 | } |
644 | |
645 | cstr data = mpack_read_bytes_inplace(reader, length); |
646 | |
647 | vws_buffer_clear(buffer); |
648 | vws_buffer_append(buffer, (ucstr)data, length); |
649 | mpack_done_str(reader); |
650 | |
651 | return length; |
652 | } |
653 | |
654 | |