1#include "server.h"
2#include "message.h"
3
4#define CTEST_MAIN
5#include "ctest.h"
6#include "common.h"
7
8cstr server_host = "127.0.0.1";
9int server_port = 8181;
10cstr uri = "ws://localhost:8181/websocket";
11cstr content = "Lorem ipsum dolor sit amet";
12
13// Server function to process messages. Runs in context of worker thread.
14void process(vws_svr* s, vws_cid_t cid, vrtql_msg* m, void* ctx)
15{
16 vrtql_msg_svr* server = (vrtql_msg_svr*)s;
17
18 vws.trace(VL_INFO, "process (%lu) %p", cid, m);
19
20 // Echo back. Note: You should always set reply messages format to the
21 // format of the connection.
22
23 // Create reply message
24 vrtql_msg* reply = vrtql_msg_new();
25 reply->format = m->format;
26
27 // Copy content
28 ucstr data = m->content->data;
29 size_t size = m->content->size;
30 vws_buffer_append(reply->content, data, size);
31
32 // Send. We don't free message as send() does it for us.
33 server->send(s, cid, reply, NULL);
34
35 // Clean up request
36 vrtql_msg_free(m);
37}
38
39void server_thread(void* arg)
40{
41 vws_tcp_svr* server = (vws_tcp_svr*)arg;
42 vws.tracelevel = VT_THREAD;
43 server->trace = vws.tracelevel;
44
45 vws_tcp_svr_run(server, server_host, server_port);
46
47 vws_cleanup();
48}
49
50void client_thread(void* arg)
51{
52 vws_cnx* cnx = vws_cnx_new();
53
54 while (vws_connect(cnx, uri) == false)
55 {
56 vws.trace(VL_ERROR, "[client]: connecting %s", uri);
57 }
58
59 cstr payload = "payload";
60
61 int i = 0;
62 while (true)
63 {
64 if (i++ > 10)
65 {
66 break;
67 }
68
69 // Create
70 vrtql_msg* request = vrtql_msg_new();
71 vrtql_msg_set_content(request, payload);
72
73 // Send
74 while (vrtql_msg_send(cnx, request) < 0)
75 {
76 if (vws_socket_is_connected((vws_socket*)cnx) == false)
77 {
78 goto restart;
79 }
80 }
81
82 // Receive
83 vrtql_msg* reply = NULL;
84
85 while (reply == NULL)
86 {
87 printf("vrtql_msg_recv(cnx)\n");
88 reply = vrtql_msg_recv(cnx);
89 if (vws_socket_is_connected((vws_socket*)cnx) == false)
90 {
91 goto restart;
92 }
93 }
94
95 // Check
96 ucstr content = reply->content->data;
97 size_t size = reply->content->size;
98 ASSERT_TRUE(strncmp(payload, (cstr)content, size) == 0);
99 vrtql_msg_free(reply);
100
101restart:
102
103 // Cleanup
104 vrtql_msg_free(request);
105 }
106
107 // Disconnect
108 vws_disconnect(cnx);
109 vws_cnx_free(cnx);
110
111 vws_cleanup();
112}
113
114void client_test(int iterations, int nt)
115{
116 for (int i = 0; i < iterations; i++)
117 {
118 uv_thread_t* threads = vws.malloc(sizeof(uv_thread_t) * nt);
119
120 for (int i = 0; i < nt; i++)
121 {
122 uv_thread_create(&threads[i], client_thread, NULL);
123 vws.trace(VL_INFO, "started client thread %p", threads[i]);
124 }
125
126 for (int i = 0; i < nt; i++)
127 {
128 uv_thread_join(&threads[i]);
129 vws.trace(VL_INFO, "stopped client thread %p", threads[i]);
130 }
131
132 free(threads);
133 }
134}
135
136CTEST(test_msg_server, echo)
137{
138 vrtql_msg_svr* server = vrtql_msg_svr_new(10, 0, 0);
139 server->process = process;
140
141 uv_thread_t server_tid;
142 uv_thread_create(&server_tid, server_thread, server);
143
144 // Wait for server to start up
145 while (vws_tcp_svr_state((vws_tcp_svr*)server) != VS_RUNNING)
146 {
147 vws_msleep(100);
148 }
149
150 client_test(5, 50);
151
152 // Shutdown
153
154 // Need to give the server time to properly send out CLOSE frames, etc. If
155 // we don't give it time, then it will it may fail to complete sending
156 // CLOSE_FRAME ws_svr_process_frame() and look like a memory leak in
157 // valgrind.
158 sleep(1);
159
160 // Shutdown server
161 vws_tcp_svr_stop((vws_tcp_svr*)server);
162 uv_thread_join(&server_tid);
163 vrtql_msg_svr_free(server);
164}
165
166int main(int argc, const char* argv[])
167{
168 return ctest_main(argc, argv);
169}
170