1#include "server.h"
2#include "message.h"
3
4#define CTEST_MAIN
5#include "ctest.h"
6#include "common.h"
7
8cstr server_host = "127.0.0.1";
9int server_port = 8181;
10cstr uri = "ws://localhost:8181/websocket";
11cstr content = "Lorem ipsum dolor sit amet";
12
13// Server function to process messages. Runs in context of worker thread.
14void process(vws_svr* s, vws_cid_t cid, vws_msg* m, void* ctx)
15{
16 vws.trace(VL_INFO, "process_message (%ul) %p", cid, m);
17
18 // Echo back. Note: You should always set reply messages format to the
19 // format of the connection.
20
21 // Create reply message
22 vws_msg* reply = vws_msg_new();
23
24 // Use same format
25 reply->opcode = m->opcode;
26
27 // Copy content
28 vws_buffer_append(reply->data, m->data->data, m->data->size);
29
30 // Send. We don't free message as send() does it for us.
31 s->send(s, cid, reply, NULL);
32
33 // Clean up request
34 vws_msg_free(m);
35}
36
37void server_thread(void* arg)
38{
39 vws_tcp_svr* server = (vws_tcp_svr*)arg;
40 vws.tracelevel = VT_THREAD;
41 server->trace = vws.tracelevel;
42
43 vws_tcp_svr_run(server, server_host, server_port);
44
45 vws_cleanup();
46}
47
48void client_thread(void* arg)
49{
50 vws_cnx* cnx = vws_cnx_new();
51
52 while (vws_connect(cnx, uri) == false)
53 {
54 vws.trace(VL_ERROR, "[client]: connecting %s", uri);
55 }
56
57 cstr payload = "payload";
58
59 int i = 0;
60 while (true)
61 {
62restart:
63
64 if (i++ > 10)
65 {
66 break;
67 }
68
69 // Send
70 while (vws_msg_send_text(cnx, payload) < 0)
71 {
72 if (vws_socket_is_connected((vws_socket*)cnx) == false)
73 {
74 goto restart;
75 }
76 }
77
78 // Receive
79 vws_msg* reply = vws_msg_recv(cnx);
80
81 while (reply == NULL)
82 {
83 printf("vrtql_msg_recv(cnx)\n");
84 reply = vws_msg_recv(cnx);
85 if (vws_socket_is_connected((vws_socket*)cnx) == false)
86 {
87 goto restart;
88 }
89 }
90
91 // Check
92 ucstr content = reply->data->data;
93 size_t size = reply->data->size;
94 ASSERT_TRUE(strncmp(payload, (cstr)content, size) == 0);
95 vws_msg_free(reply);
96 }
97
98 // Disconnect
99 vws_disconnect(cnx);
100 vws_cnx_free(cnx);
101
102 vws_cleanup();
103}
104
105void client_test(int iterations, int nt)
106{
107 for (int i = 0; i < iterations; i++)
108 {
109 uv_thread_t* threads = vws.malloc(sizeof(uv_thread_t) * nt);
110
111 for (int i = 0; i < nt; i++)
112 {
113 uv_thread_create(&threads[i], client_thread, NULL);
114 vws.trace(VL_INFO, "started client thread %p", threads[i]);
115 }
116
117 for (int i = 0; i < nt; i++)
118 {
119 uv_thread_join(&threads[i]);
120 vws.trace(VL_INFO, "stopped client thread %p", threads[i]);
121 }
122
123 free(threads);
124 }
125}
126
127CTEST(test_msg_server, echo)
128{
129 vws_svr* server = vws_svr_new(10, 0, 0);
130 server->process = process;
131
132 uv_thread_t server_tid;
133 uv_thread_create(&server_tid, server_thread, server);
134
135 // Wait for server to start up
136 while (vws_tcp_svr_state((vws_tcp_svr*)server) != VS_RUNNING)
137 {
138 vws_msleep(100);
139 }
140
141 client_test(5, 50);
142
143 // Shutdown
144
145 // Need to give the server time to properly send out CLOSE frames, etc. If
146 // we don't give it time, then it will it may fail to complete sending
147 // CLOSE_FRAME ws_svr_process_frame() and look like a memory leak in
148 // valgrind.
149 sleep(1);
150
151 // Shutdown server
152 vws_tcp_svr_stop((vws_tcp_svr*)server);
153 uv_thread_join(&server_tid);
154 vws_svr_free(server);
155}
156
157int main(int argc, const char* argv[])
158{
159 return ctest_main(argc, argv);
160}
161