1 | #include <stdbool.h> |
2 | #include <stdio.h> |
3 | #include <stdlib.h> |
4 | #include <string.h> |
5 | |
6 | #include <openssl/rand.h> |
7 | |
8 | #include "rpc.h" |
9 | |
10 | //------------------------------------------------------------------------------ |
11 | // Internal functions |
12 | //------------------------------------------------------------------------------ |
13 | |
14 | static bool reconnect(vrtql_rpc* rpc) |
15 | { |
16 | // Try to reconnect |
17 | if (vws_reconnect(rpc->cnx) == false) |
18 | { |
19 | return false; |
20 | } |
21 | |
22 | if (rpc->reconnect != NULL) |
23 | { |
24 | if (rpc->reconnect(rpc) == false) |
25 | { |
26 | return false; |
27 | } |
28 | } |
29 | |
30 | return true; |
31 | } |
32 | |
33 | //------------------------------------------------------------------------------ |
34 | // Client-side Internal functions |
35 | //------------------------------------------------------------------------------ |
36 | |
37 | char* vrtql_rpc_tag(uint16_t length) |
38 | { |
39 | char valid_chars[] = "abcdefghijklmnopqrstuvwxyz0123456789" ; |
40 | unsigned char* data = (unsigned char*)malloc(length); |
41 | unsigned char* tag = (unsigned char*)malloc(length); |
42 | |
43 | if (RAND_bytes(data, length) != 1) |
44 | { |
45 | free(data); |
46 | free(tag); |
47 | |
48 | return NULL; |
49 | } |
50 | |
51 | uint16_t vc_len = strlen(valid_chars); |
52 | for (uint16_t cnt = 0; cnt < length; cnt++) |
53 | { |
54 | uint16_t c = data[cnt] % vc_len; |
55 | tag[cnt] = valid_chars[c]; |
56 | } |
57 | |
58 | free(data); |
59 | |
60 | return (char*)tag; |
61 | } |
62 | |
63 | bool vrtql_rpc_invoke(vrtql_rpc* rpc, vrtql_msg* req) |
64 | { |
65 | // Clear previous response |
66 | vws_buffer_clear(rpc->val); |
67 | |
68 | // Make the call |
69 | vrtql_msg* reply = vrtql_rpc_exec(rpc, req); |
70 | |
71 | // If no response |
72 | if (reply == NULL) |
73 | { |
74 | // Error already set |
75 | return false; |
76 | } |
77 | |
78 | // If we received content |
79 | if (req->content->size > 0) |
80 | { |
81 | // Copy it |
82 | vws_buffer_append(rpc->val, reply->content->data, reply->content->size); |
83 | } |
84 | |
85 | // Translate response code and message |
86 | cstr rc = vrtql_msg_get_header(reply, "rc" ); |
87 | cstr msg = vrtql_msg_get_header(reply, "msg" ); |
88 | |
89 | if ((rc != NULL) && (msg != NULL)) |
90 | { |
91 | vws.error(atoi(rc), msg); |
92 | } |
93 | else |
94 | { |
95 | vws.e.code = atoi(rc); |
96 | } |
97 | |
98 | // Cleanup |
99 | vrtql_msg_free(req); |
100 | vrtql_msg_free(reply); |
101 | |
102 | return true; |
103 | } |
104 | |
105 | vrtql_msg* vrtql_rpc_exec(vrtql_rpc* rpc, vrtql_msg* req) |
106 | { |
107 | //> Send request |
108 | |
109 | // Assign a tag to verify response |
110 | cstr tag = vrtql_rpc_tag(7); |
111 | vrtql_msg_set_routing(req, "tag" , tag); |
112 | |
113 | if (vws.tracelevel >= VT_SERVICE) |
114 | { |
115 | vws_trace_lock(); |
116 | printf("\n\n" ); |
117 | printf("+----------------------------------------------------+\n" ); |
118 | printf("| vrtql_rpc_exec(): Message Sent |\n" ); |
119 | printf("+----------------------------------------------------+\n" ); |
120 | vrtql_msg_dump(req); |
121 | printf("------------------------------------------------------\n" ); |
122 | vws_trace_unlock(); |
123 | } |
124 | |
125 | // Loop until message sent or fatal error. Timeouts are ignored: keeps |
126 | // grinding until message is fully sent or error. |
127 | while (true) |
128 | { |
129 | if (vrtql_msg_send(rpc->cnx, req) > 0) |
130 | { |
131 | // Message successfully sent |
132 | break; |
133 | } |
134 | |
135 | // If connection dropped |
136 | if (vws.e.code == VE_SOCKET) |
137 | { |
138 | // Try to reconnect |
139 | if (reconnect(rpc) == true) |
140 | { |
141 | // Reconnect worked. Try again. |
142 | continue; |
143 | } |
144 | |
145 | // Failed to reconnect. Modify error to indicate the failure was on |
146 | // send so the caller knows the message was not sent. Error will |
147 | // have both bits set: VE_SOCKET and VE_SEND. |
148 | vws_set_flag(&vws.e.code, VE_SEND); |
149 | } |
150 | |
151 | // Hand error back to caller. |
152 | free(tag); |
153 | return NULL; |
154 | } |
155 | |
156 | //> Wait for response |
157 | |
158 | int retries = 0; |
159 | vrtql_msg* reply = NULL; |
160 | |
161 | while (retries < rpc->retries) |
162 | { |
163 | reply = vrtql_msg_recv(rpc->cnx); |
164 | |
165 | // If we have a message |
166 | if (reply != NULL) |
167 | { |
168 | // Get message tag |
169 | cstr t = vrtql_msg_get_routing(reply, "tag" ); |
170 | |
171 | // If tags do not match |
172 | if (strncmp(tag, t, strlen(tag)) != 0) |
173 | { |
174 | // This is not response message. Send to handler. |
175 | rpc->out_of_band(rpc, reply); |
176 | |
177 | // Keep waiting for response. |
178 | continue; |
179 | } |
180 | |
181 | vws.success(); |
182 | |
183 | break; |
184 | } |
185 | |
186 | if (vws.e.code == VE_TIMEOUT) |
187 | { |
188 | retries++; |
189 | continue; |
190 | } |
191 | else |
192 | { |
193 | if (vws.e.code == VE_SOCKET) |
194 | { |
195 | // Modify error to indicate the failure was on recv, so the |
196 | // caller knows the message was sent. Error will have both bits |
197 | // set: VE_SOCKET and VE_RECV. |
198 | vws_set_flag(&vws.e.code, VE_RECV); |
199 | } |
200 | |
201 | // Something unexpected happend. We expect vrtql_msg_recv() to set |
202 | // appropriate error. Even if disconenct (VE_SOCKET) there is no |
203 | // point in reconnecting because we have lost our response no matter |
204 | // what. We will leave error as it is and hand back to caller. |
205 | break; |
206 | } |
207 | } |
208 | |
209 | free(tag); |
210 | |
211 | if ((vws.tracelevel >= VT_SERVICE) && (reply != NULL)) |
212 | { |
213 | vws_trace_lock(); |
214 | printf("\n\n" ); |
215 | printf("+----------------------------------------------------+\n" ); |
216 | printf("| vrtql_rpc_exec(): Message Recived |\n" ); |
217 | printf("+----------------------------------------------------+\n" ); |
218 | vrtql_msg_dump(reply); |
219 | printf("------------------------------------------------------\n" ); |
220 | vws_trace_unlock(); |
221 | } |
222 | |
223 | return reply; |
224 | } |
225 | |
226 | void out_of_band_default(vrtql_rpc* rpc, vrtql_msg* m) |
227 | { |
228 | if (m != NULL) |
229 | { |
230 | vrtql_msg_free(m); |
231 | } |
232 | } |
233 | |
234 | //------------------------------------------------------------------------------ |
235 | // Client-side API |
236 | //------------------------------------------------------------------------------ |
237 | |
238 | vrtql_rpc* vrtql_rpc_new(vws_cnx* cnx) |
239 | { |
240 | vrtql_rpc* rpc = (vrtql_rpc*)vws.malloc(sizeof(vrtql_rpc)); |
241 | rpc->cnx = cnx; |
242 | rpc->retries = 5; |
243 | rpc->out_of_band = out_of_band_default; |
244 | rpc->reconnect = NULL; |
245 | rpc->data = NULL; |
246 | rpc->val = vws_buffer_new(); |
247 | |
248 | return rpc; |
249 | } |
250 | |
251 | void vrtql_rpc_free(vrtql_rpc* rpc) |
252 | { |
253 | if (rpc != NULL) |
254 | { |
255 | vws_buffer_new(rpc->val); |
256 | vws.free(rpc); |
257 | } |
258 | } |
259 | |
260 | //------------------------------------------------------------------------------ |
261 | // Server-side Internal functions |
262 | //------------------------------------------------------------------------------ |
263 | |
264 | typedef void (*vrtql_rpc_map_free)(void* e); |
265 | |
266 | /** |
267 | * @brief Retrieves a value from the map using a string key. |
268 | * |
269 | * Returns a constant pointer to the value associated with the key. |
270 | * |
271 | * @param map The map from which to retrieve the value. |
272 | * @param key The string key to use for retrieval. |
273 | * @return A constant pointer to the value associated with the key. |
274 | */ |
275 | static void* sys_map_get(vrtql_rpc_map* map, cstr key); |
276 | |
277 | /** |
278 | * @brief Sets a value in the map using a string key and value. |
279 | * |
280 | * It will create a new key-value pair or update the value if the key already exists. |
281 | * |
282 | * @param map The map in which to set the value. |
283 | * @param key The string key to use for setting. |
284 | * @param value The value to set. |
285 | */ |
286 | static void sys_map_set(vrtql_rpc_map* map, cstr key, void* value); |
287 | |
288 | /** |
289 | * @brief Removes a key-value pair from the map using a string key. |
290 | * |
291 | * If the key exists in the map, it will be removed along with its associated value. |
292 | * |
293 | * @param map The map from which to remove the key-value pair. |
294 | * @param key The string key to use for removal. |
295 | */ |
296 | static void sys_map_clear(vrtql_rpc_map* map, cstr key, vrtql_rpc_map_free cb); |
297 | |
298 | //------------------------------------------------------------------------------ |
299 | // Server-side API |
300 | //------------------------------------------------------------------------------ |
301 | |
302 | vrtql_rpc_module* vrtql_rpc_module_new(cstr name) |
303 | { |
304 | if (name == NULL) |
305 | { |
306 | vws.error(VE_RT, "module name cannot be NULL" ); |
307 | } |
308 | |
309 | vrtql_rpc_module* m; |
310 | |
311 | m = (vrtql_rpc_module*)vws.malloc(sizeof(vrtql_rpc_module)); |
312 | |
313 | m->name = strdup(name); |
314 | m->data = NULL; |
315 | sc_map_init_sv(&m->calls, 0, 0); |
316 | |
317 | return m; |
318 | } |
319 | |
320 | void vrtql_rpc_module_free(vrtql_rpc_module* m) |
321 | { |
322 | if (m != NULL) |
323 | { |
324 | cstr key; vrtql_rpc_call* call; |
325 | sc_map_foreach(&m->calls, key, call) |
326 | { |
327 | vws.free(key); |
328 | } |
329 | |
330 | sc_map_term_sv(&m->calls); |
331 | |
332 | vws.free(m->name); |
333 | vws.free(m); |
334 | } |
335 | } |
336 | |
337 | void vrtql_rpc_module_set(vrtql_rpc_module* m, cstr n, vrtql_rpc_call c) |
338 | { |
339 | sys_map_set(&m->calls, n, c); |
340 | } |
341 | |
342 | vrtql_rpc_call vrtql_rpc_module_get(vrtql_rpc_module* m, cstr n) |
343 | { |
344 | return sys_map_get(&m->calls, n); |
345 | } |
346 | |
347 | //------------------------------------------------------------------------------ |
348 | // RPC System |
349 | //------------------------------------------------------------------------------ |
350 | |
351 | vrtql_rpc_system* vrtql_rpc_system_new() |
352 | { |
353 | vrtql_rpc_system* s; |
354 | s = (vrtql_rpc_system*)vws.malloc(sizeof(vrtql_rpc_system)); |
355 | sc_map_init_sv(&s->modules, 0, 0); |
356 | |
357 | return s; |
358 | } |
359 | |
360 | void vrtql_rpc_system_free(vrtql_rpc_system* s) |
361 | { |
362 | if (s != NULL) |
363 | { |
364 | cstr key; vrtql_rpc_module* module; |
365 | sc_map_foreach(&s->modules, key, module) |
366 | { |
367 | vrtql_rpc_module_free(module); |
368 | vws.free(key); |
369 | |
370 | /* |
371 | sys_map_clear( &s->modules, |
372 | key, |
373 | (vrtql_rpc_map_free)vrtql_rpc_module_free); |
374 | */ |
375 | } |
376 | |
377 | sc_map_term_sv(&s->modules); |
378 | |
379 | vws.free(s); |
380 | } |
381 | } |
382 | |
383 | void vrtql_rpc_system_set(vrtql_rpc_system* s, vrtql_rpc_module* m) |
384 | { |
385 | sys_map_set(&s->modules, m->name, m); |
386 | } |
387 | |
388 | vrtql_rpc_module* vrtql_rpc_system_get(vrtql_rpc_system* s, cstr n) |
389 | { |
390 | return sys_map_get(&s->modules, n); |
391 | } |
392 | |
393 | //------------------------------------------------------------------------------ |
394 | // RPC API |
395 | //------------------------------------------------------------------------------ |
396 | |
397 | bool parse_rpc_string(const char* input, char** module, char** function) |
398 | { |
399 | // Find the first occurrence of the period |
400 | const char* delimiter = strchr(input, '.'); |
401 | |
402 | if (delimiter != NULL) |
403 | { |
404 | // Calculate the lengths of the module and function substrings |
405 | size_t m_len = delimiter - input; |
406 | size_t f_len = strlen(input) - m_len - 1; |
407 | |
408 | // Allocate memory for the module and function strings |
409 | *module = (char*)vws.malloc((m_len + 1) * sizeof(char)); |
410 | *function = (char*)vws.malloc((f_len + 1) * sizeof(char)); |
411 | |
412 | // Copy the module substring |
413 | strncpy(*module, input, m_len); |
414 | (*module)[m_len] = '\0'; |
415 | |
416 | // Copy the function substring |
417 | strncpy(*function, delimiter + 1, f_len); |
418 | (*function)[f_len] = '\0'; |
419 | |
420 | return true; |
421 | } |
422 | |
423 | // Invalid input format |
424 | return false; |
425 | } |
426 | |
427 | vrtql_msg* vrtql_rpc_reply(vrtql_msg* req) |
428 | { |
429 | vrtql_msg* reply = vrtql_msg_new(); |
430 | |
431 | cstr tag = vrtql_msg_get_routing(req, "tag" ); |
432 | |
433 | if (tag != NULL) |
434 | { |
435 | vrtql_msg_set_routing(req, "tag" , tag); |
436 | } |
437 | |
438 | // Use same format as request (JSON/MessagePack) |
439 | reply->format = req->format; |
440 | |
441 | return reply; |
442 | } |
443 | |
444 | vrtql_msg* vrtql_rpc_service(vrtql_rpc_system* s, vrtql_rpc_env* e, vrtql_msg* req) |
445 | { |
446 | vws.success(); |
447 | |
448 | if (vws.tracelevel >= VT_SERVICE) |
449 | { |
450 | vws_trace_lock(); |
451 | printf("\n\n" ); |
452 | printf("+----------------------------------------------------+\n" ); |
453 | printf("| Message Received |\n" ); |
454 | printf("+----------------------------------------------------+\n" ); |
455 | vrtql_msg_dump(req); |
456 | printf("------------------------------------------------------\n" ); |
457 | vws_trace_unlock(); |
458 | } |
459 | |
460 | cstr id = vrtql_msg_get_header(req, "id" ); |
461 | |
462 | if (id == NULL) |
463 | { |
464 | vws.error(VE_RT, "ID not specified" ); |
465 | |
466 | vrtql_msg_free(req); |
467 | |
468 | return NULL; |
469 | } |
470 | |
471 | // Parse ID into module and function |
472 | char* mn; char* fn; |
473 | if (parse_rpc_string(id, &mn, &fn) == false) |
474 | { |
475 | vws.error(VE_RT, "Invalid ID format" ); |
476 | |
477 | vrtql_msg_free(req); |
478 | |
479 | return NULL; |
480 | } |
481 | |
482 | // Lookup module in system |
483 | vrtql_rpc_module* module = vrtql_rpc_system_get(s, mn); |
484 | |
485 | if (module == NULL) |
486 | { |
487 | vws.error(VE_RT, "RPC does not exist" ); |
488 | |
489 | vrtql_msg_free(req); |
490 | vws.free(mn); |
491 | vws.free(fn); |
492 | |
493 | return NULL; |
494 | } |
495 | |
496 | // Set module reference in environment |
497 | e->module = module; |
498 | |
499 | // Lookup RPC in module |
500 | vrtql_rpc_call rpc = vrtql_rpc_module_get(module, fn); |
501 | |
502 | if (rpc == NULL) |
503 | { |
504 | vws.error(VE_RT, "RPC does not exist" ); |
505 | |
506 | vrtql_msg_free(req); |
507 | vws.free(mn); |
508 | vws.free(fn); |
509 | |
510 | return NULL; |
511 | } |
512 | |
513 | vws.free(mn); |
514 | vws.free(fn); |
515 | |
516 | // Invoke RPC |
517 | vrtql_msg* reply = rpc(e, req); |
518 | |
519 | // Free request |
520 | vrtql_msg_free(req); |
521 | |
522 | if ((vws.tracelevel >= VT_SERVICE) && (reply != NULL)) |
523 | { |
524 | vws_trace_lock(); |
525 | printf("\n\n" ); |
526 | printf("+----------------------------------------------------+\n" ); |
527 | printf("| Message Sent |\n" ); |
528 | printf("+----------------------------------------------------+\n" ); |
529 | vrtql_msg_dump(reply); |
530 | printf("------------------------------------------------------\n" ); |
531 | vws_trace_unlock(); |
532 | } |
533 | |
534 | // Return reply |
535 | return reply; |
536 | } |
537 | |
538 | //------------------------------------------------------------------------------ |
539 | // Internal Functions |
540 | //------------------------------------------------------------------------------ |
541 | |
542 | void* sys_map_get(vrtql_rpc_map* map, cstr key) |
543 | { |
544 | // See if we have an existing entry |
545 | cstr v = sc_map_get_sv(map, key); |
546 | |
547 | if (sc_map_found(map) == false) |
548 | { |
549 | return NULL; |
550 | } |
551 | |
552 | return v; |
553 | } |
554 | |
555 | void sys_map_set(vrtql_rpc_map* map, cstr key, void* value) |
556 | { |
557 | // See if we have an existing entry |
558 | sc_map_get_sv(map, key); |
559 | |
560 | if (sc_map_found(map) == false) |
561 | { |
562 | // We don't. Therefore we need to allocate new key. |
563 | key = strdup(key); |
564 | } |
565 | |
566 | sc_map_put_sv(map, key, value); |
567 | } |
568 | |
569 | void sys_map_clear(vrtql_rpc_map* map, cstr key, vrtql_rpc_map_free cb) |
570 | { |
571 | // See if we have an existing entry |
572 | cstr v = sc_map_get_sv(map, key); |
573 | |
574 | if (sc_map_found(map) == true) |
575 | { |
576 | // Call callback function to cleanup |
577 | cb(v); |
578 | } |
579 | |
580 | sc_map_del_sv(map, key); |
581 | } |
582 | |
583 | |