1#include <stdbool.h>
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5
6#include <openssl/rand.h>
7
8#include "rpc.h"
9
10//------------------------------------------------------------------------------
11// Internal functions
12//------------------------------------------------------------------------------
13
14static bool reconnect(vrtql_rpc* rpc)
15{
16 // Try to reconnect
17 if (vws_reconnect(rpc->cnx) == false)
18 {
19 return false;
20 }
21
22 if (rpc->reconnect != NULL)
23 {
24 if (rpc->reconnect(rpc) == false)
25 {
26 return false;
27 }
28 }
29
30 return true;
31}
32
33//------------------------------------------------------------------------------
34// Client-side Internal functions
35//------------------------------------------------------------------------------
36
37char* vrtql_rpc_tag(uint16_t length)
38{
39 char valid_chars[] = "abcdefghijklmnopqrstuvwxyz0123456789";
40 unsigned char* data = (unsigned char*)malloc(length);
41 unsigned char* tag = (unsigned char*)malloc(length);
42
43 if (RAND_bytes(data, length) != 1)
44 {
45 free(data);
46 free(tag);
47
48 return NULL;
49 }
50
51 uint16_t vc_len = strlen(valid_chars);
52 for (uint16_t cnt = 0; cnt < length; cnt++)
53 {
54 uint16_t c = data[cnt] % vc_len;
55 tag[cnt] = valid_chars[c];
56 }
57
58 free(data);
59
60 return (char*)tag;
61}
62
63bool vrtql_rpc_invoke(vrtql_rpc* rpc, vrtql_msg* req)
64{
65 // Clear previous response
66 vws_buffer_clear(rpc->val);
67
68 // Make the call
69 vrtql_msg* reply = vrtql_rpc_exec(rpc, req);
70
71 // If no response
72 if (reply == NULL)
73 {
74 // Error already set
75 return false;
76 }
77
78 // If we received content
79 if (req->content->size > 0)
80 {
81 // Copy it
82 vws_buffer_append(rpc->val, reply->content->data, reply->content->size);
83 }
84
85 // Translate response code and message
86 cstr rc = vrtql_msg_get_header(reply, "rc");
87 cstr msg = vrtql_msg_get_header(reply, "msg");
88
89 if ((rc != NULL) && (msg != NULL))
90 {
91 vws.error(atoi(rc), msg);
92 }
93 else
94 {
95 vws.e.code = atoi(rc);
96 }
97
98 // Cleanup
99 vrtql_msg_free(req);
100 vrtql_msg_free(reply);
101
102 return true;
103}
104
105vrtql_msg* vrtql_rpc_exec(vrtql_rpc* rpc, vrtql_msg* req)
106{
107 //> Send request
108
109 // Assign a tag to verify response
110 cstr tag = vrtql_rpc_tag(7);
111 vrtql_msg_set_routing(req, "tag", tag);
112
113 if (vws.tracelevel >= VT_SERVICE)
114 {
115 vws_trace_lock();
116 printf("\n\n");
117 printf("+----------------------------------------------------+\n");
118 printf("| vrtql_rpc_exec(): Message Sent |\n");
119 printf("+----------------------------------------------------+\n");
120 vrtql_msg_dump(req);
121 printf("------------------------------------------------------\n");
122 vws_trace_unlock();
123 }
124
125 // Loop until message sent or fatal error. Timeouts are ignored: keeps
126 // grinding until message is fully sent or error.
127 while (true)
128 {
129 if (vrtql_msg_send(rpc->cnx, req) > 0)
130 {
131 // Message successfully sent
132 break;
133 }
134
135 // If connection dropped
136 if (vws.e.code == VE_SOCKET)
137 {
138 // Try to reconnect
139 if (reconnect(rpc) == true)
140 {
141 // Reconnect worked. Try again.
142 continue;
143 }
144
145 // Failed to reconnect. Modify error to indicate the failure was on
146 // send so the caller knows the message was not sent. Error will
147 // have both bits set: VE_SOCKET and VE_SEND.
148 vws_set_flag(&vws.e.code, VE_SEND);
149 }
150
151 // Hand error back to caller.
152 free(tag);
153 return NULL;
154 }
155
156 //> Wait for response
157
158 int retries = 0;
159 vrtql_msg* reply = NULL;
160
161 while (retries < rpc->retries)
162 {
163 reply = vrtql_msg_recv(rpc->cnx);
164
165 // If we have a message
166 if (reply != NULL)
167 {
168 // Get message tag
169 cstr t = vrtql_msg_get_routing(reply, "tag");
170
171 // If tags do not match
172 if (strncmp(tag, t, strlen(tag)) != 0)
173 {
174 // This is not response message. Send to handler.
175 rpc->out_of_band(rpc, reply);
176
177 // Keep waiting for response.
178 continue;
179 }
180
181 vws.success();
182
183 break;
184 }
185
186 if (vws.e.code == VE_TIMEOUT)
187 {
188 retries++;
189 continue;
190 }
191 else
192 {
193 if (vws.e.code == VE_SOCKET)
194 {
195 // Modify error to indicate the failure was on recv, so the
196 // caller knows the message was sent. Error will have both bits
197 // set: VE_SOCKET and VE_RECV.
198 vws_set_flag(&vws.e.code, VE_RECV);
199 }
200
201 // Something unexpected happend. We expect vrtql_msg_recv() to set
202 // appropriate error. Even if disconenct (VE_SOCKET) there is no
203 // point in reconnecting because we have lost our response no matter
204 // what. We will leave error as it is and hand back to caller.
205 break;
206 }
207 }
208
209 free(tag);
210
211 if ((vws.tracelevel >= VT_SERVICE) && (reply != NULL))
212 {
213 vws_trace_lock();
214 printf("\n\n");
215 printf("+----------------------------------------------------+\n");
216 printf("| vrtql_rpc_exec(): Message Recived |\n");
217 printf("+----------------------------------------------------+\n");
218 vrtql_msg_dump(reply);
219 printf("------------------------------------------------------\n");
220 vws_trace_unlock();
221 }
222
223 return reply;
224}
225
226void out_of_band_default(vrtql_rpc* rpc, vrtql_msg* m)
227{
228 if (m != NULL)
229 {
230 vrtql_msg_free(m);
231 }
232}
233
234//------------------------------------------------------------------------------
235// Client-side API
236//------------------------------------------------------------------------------
237
238vrtql_rpc* vrtql_rpc_new(vws_cnx* cnx)
239{
240 vrtql_rpc* rpc = (vrtql_rpc*)vws.malloc(sizeof(vrtql_rpc));
241 rpc->cnx = cnx;
242 rpc->retries = 5;
243 rpc->out_of_band = out_of_band_default;
244 rpc->reconnect = NULL;
245 rpc->data = NULL;
246 rpc->val = vws_buffer_new();
247
248 return rpc;
249}
250
251void vrtql_rpc_free(vrtql_rpc* rpc)
252{
253 if (rpc != NULL)
254 {
255 vws_buffer_new(rpc->val);
256 vws.free(rpc);
257 }
258}
259
260//------------------------------------------------------------------------------
261// Server-side Internal functions
262//------------------------------------------------------------------------------
263
264typedef void (*vrtql_rpc_map_free)(void* e);
265
266/**
267 * @brief Retrieves a value from the map using a string key.
268 *
269 * Returns a constant pointer to the value associated with the key.
270 *
271 * @param map The map from which to retrieve the value.
272 * @param key The string key to use for retrieval.
273 * @return A constant pointer to the value associated with the key.
274 */
275static void* sys_map_get(vrtql_rpc_map* map, cstr key);
276
277/**
278 * @brief Sets a value in the map using a string key and value.
279 *
280 * It will create a new key-value pair or update the value if the key already exists.
281 *
282 * @param map The map in which to set the value.
283 * @param key The string key to use for setting.
284 * @param value The value to set.
285 */
286static void sys_map_set(vrtql_rpc_map* map, cstr key, void* value);
287
288/**
289 * @brief Removes a key-value pair from the map using a string key.
290 *
291 * If the key exists in the map, it will be removed along with its associated value.
292 *
293 * @param map The map from which to remove the key-value pair.
294 * @param key The string key to use for removal.
295 */
296static void sys_map_clear(vrtql_rpc_map* map, cstr key, vrtql_rpc_map_free cb);
297
298//------------------------------------------------------------------------------
299// Server-side API
300//------------------------------------------------------------------------------
301
302vrtql_rpc_module* vrtql_rpc_module_new(cstr name)
303{
304 if (name == NULL)
305 {
306 vws.error(VE_RT, "module name cannot be NULL");
307 }
308
309 vrtql_rpc_module* m;
310
311 m = (vrtql_rpc_module*)vws.malloc(sizeof(vrtql_rpc_module));
312
313 m->name = strdup(name);
314 m->data = NULL;
315 sc_map_init_sv(&m->calls, 0, 0);
316
317 return m;
318}
319
320void vrtql_rpc_module_free(vrtql_rpc_module* m)
321{
322 if (m != NULL)
323 {
324 cstr key; vrtql_rpc_call* call;
325 sc_map_foreach(&m->calls, key, call)
326 {
327 vws.free(key);
328 }
329
330 sc_map_term_sv(&m->calls);
331
332 vws.free(m->name);
333 vws.free(m);
334 }
335}
336
337void vrtql_rpc_module_set(vrtql_rpc_module* m, cstr n, vrtql_rpc_call c)
338{
339 sys_map_set(&m->calls, n, c);
340}
341
342vrtql_rpc_call vrtql_rpc_module_get(vrtql_rpc_module* m, cstr n)
343{
344 return sys_map_get(&m->calls, n);
345}
346
347//------------------------------------------------------------------------------
348// RPC System
349//------------------------------------------------------------------------------
350
351vrtql_rpc_system* vrtql_rpc_system_new()
352{
353 vrtql_rpc_system* s;
354 s = (vrtql_rpc_system*)vws.malloc(sizeof(vrtql_rpc_system));
355 sc_map_init_sv(&s->modules, 0, 0);
356
357 return s;
358}
359
360void vrtql_rpc_system_free(vrtql_rpc_system* s)
361{
362 if (s != NULL)
363 {
364 cstr key; vrtql_rpc_module* module;
365 sc_map_foreach(&s->modules, key, module)
366 {
367 vrtql_rpc_module_free(module);
368 vws.free(key);
369
370 /*
371 sys_map_clear( &s->modules,
372 key,
373 (vrtql_rpc_map_free)vrtql_rpc_module_free);
374 */
375 }
376
377 sc_map_term_sv(&s->modules);
378
379 vws.free(s);
380 }
381}
382
383void vrtql_rpc_system_set(vrtql_rpc_system* s, vrtql_rpc_module* m)
384{
385 sys_map_set(&s->modules, m->name, m);
386}
387
388vrtql_rpc_module* vrtql_rpc_system_get(vrtql_rpc_system* s, cstr n)
389{
390 return sys_map_get(&s->modules, n);
391}
392
393//------------------------------------------------------------------------------
394// RPC API
395//------------------------------------------------------------------------------
396
397bool parse_rpc_string(const char* input, char** module, char** function)
398{
399 // Find the first occurrence of the period
400 const char* delimiter = strchr(input, '.');
401
402 if (delimiter != NULL)
403 {
404 // Calculate the lengths of the module and function substrings
405 size_t m_len = delimiter - input;
406 size_t f_len = strlen(input) - m_len - 1;
407
408 // Allocate memory for the module and function strings
409 *module = (char*)vws.malloc((m_len + 1) * sizeof(char));
410 *function = (char*)vws.malloc((f_len + 1) * sizeof(char));
411
412 // Copy the module substring
413 strncpy(*module, input, m_len);
414 (*module)[m_len] = '\0';
415
416 // Copy the function substring
417 strncpy(*function, delimiter + 1, f_len);
418 (*function)[f_len] = '\0';
419
420 return true;
421 }
422
423 // Invalid input format
424 return false;
425}
426
427vrtql_msg* vrtql_rpc_reply(vrtql_msg* req)
428{
429 vrtql_msg* reply = vrtql_msg_new();
430
431 cstr tag = vrtql_msg_get_routing(req, "tag");
432
433 if (tag != NULL)
434 {
435 vrtql_msg_set_routing(req, "tag", tag);
436 }
437
438 // Use same format as request (JSON/MessagePack)
439 reply->format = req->format;
440
441 return reply;
442}
443
444vrtql_msg* vrtql_rpc_service(vrtql_rpc_system* s, vrtql_rpc_env* e, vrtql_msg* req)
445{
446 vws.success();
447
448 if (vws.tracelevel >= VT_SERVICE)
449 {
450 vws_trace_lock();
451 printf("\n\n");
452 printf("+----------------------------------------------------+\n");
453 printf("| Message Received |\n");
454 printf("+----------------------------------------------------+\n");
455 vrtql_msg_dump(req);
456 printf("------------------------------------------------------\n");
457 vws_trace_unlock();
458 }
459
460 cstr id = vrtql_msg_get_header(req, "id");
461
462 if (id == NULL)
463 {
464 vws.error(VE_RT, "ID not specified");
465
466 vrtql_msg_free(req);
467
468 return NULL;
469 }
470
471 // Parse ID into module and function
472 char* mn; char* fn;
473 if (parse_rpc_string(id, &mn, &fn) == false)
474 {
475 vws.error(VE_RT, "Invalid ID format");
476
477 vrtql_msg_free(req);
478
479 return NULL;
480 }
481
482 // Lookup module in system
483 vrtql_rpc_module* module = vrtql_rpc_system_get(s, mn);
484
485 if (module == NULL)
486 {
487 vws.error(VE_RT, "RPC does not exist");
488
489 vrtql_msg_free(req);
490 vws.free(mn);
491 vws.free(fn);
492
493 return NULL;
494 }
495
496 // Set module reference in environment
497 e->module = module;
498
499 // Lookup RPC in module
500 vrtql_rpc_call rpc = vrtql_rpc_module_get(module, fn);
501
502 if (rpc == NULL)
503 {
504 vws.error(VE_RT, "RPC does not exist");
505
506 vrtql_msg_free(req);
507 vws.free(mn);
508 vws.free(fn);
509
510 return NULL;
511 }
512
513 vws.free(mn);
514 vws.free(fn);
515
516 // Invoke RPC
517 vrtql_msg* reply = rpc(e, req);
518
519 // Free request
520 vrtql_msg_free(req);
521
522 if ((vws.tracelevel >= VT_SERVICE) && (reply != NULL))
523 {
524 vws_trace_lock();
525 printf("\n\n");
526 printf("+----------------------------------------------------+\n");
527 printf("| Message Sent |\n");
528 printf("+----------------------------------------------------+\n");
529 vrtql_msg_dump(reply);
530 printf("------------------------------------------------------\n");
531 vws_trace_unlock();
532 }
533
534 // Return reply
535 return reply;
536}
537
538//------------------------------------------------------------------------------
539// Internal Functions
540//------------------------------------------------------------------------------
541
542void* sys_map_get(vrtql_rpc_map* map, cstr key)
543{
544 // See if we have an existing entry
545 cstr v = sc_map_get_sv(map, key);
546
547 if (sc_map_found(map) == false)
548 {
549 return NULL;
550 }
551
552 return v;
553}
554
555void sys_map_set(vrtql_rpc_map* map, cstr key, void* value)
556{
557 // See if we have an existing entry
558 sc_map_get_sv(map, key);
559
560 if (sc_map_found(map) == false)
561 {
562 // We don't. Therefore we need to allocate new key.
563 key = strdup(key);
564 }
565
566 sc_map_put_sv(map, key, value);
567}
568
569void sys_map_clear(vrtql_rpc_map* map, cstr key, vrtql_rpc_map_free cb)
570{
571 // See if we have an existing entry
572 cstr v = sc_map_get_sv(map, key);
573
574 if (sc_map_found(map) == true)
575 {
576 // Call callback function to cleanup
577 cb(v);
578 }
579
580 sc_map_del_sv(map, key);
581}
582
583