This example implements a simple publish-subscribe message broker using the VRTQL Message server. Clients can subscribe to topics, and when a message is published to a topic, all subscribers receive a copy. The broker uses the message routing map to convey the topic and action (subscribe, unsubscribe, publish).
#include <string.h>
#include <vws/server.h>
#include <vws/message.h>
//----------------------------------------------------------------------
// Subscription storage
//----------------------------------------------------------------------
// Maximum topics and subscribers per topic
#define MAX_TOPICS 64
#define MAX_SUBS 256
typedef struct
{
char topic[128];
vws_cid_t subscribers[MAX_SUBS];
int count;
} topic_entry;
typedef struct
{
topic_entry topics[MAX_TOPICS];
int count;
uv_mutex_t lock;
} broker_state;
static broker_state broker;
void broker_init()
{
memset(&broker, 0, sizeof(broker));
uv_mutex_init(&broker.lock);
}
void broker_destroy()
{
uv_mutex_destroy(&broker.lock);
}
// Find or create a topic entry
topic_entry* broker_get_topic(cstr topic, bool create)
{
for (int i = 0; i < broker.count; i++)
{
if (strcmp(broker.topics[i].topic, topic) == 0)
{
return &broker.topics[i];
}
}
if (create && broker.count < MAX_TOPICS)
{
topic_entry* e = &broker.topics[broker.count++];
strncpy(e->topic, topic, sizeof(e->topic) - 1);
e->count = 0;
return e;
}
return NULL;
}
void broker_subscribe(cstr topic, vws_cid_t cid)
{
uv_mutex_lock(&broker.lock);
topic_entry* e = broker_get_topic(topic, true);
if (e != NULL && e->count < MAX_SUBS)
{
e->subscribers[e->count++] = cid;
}
uv_mutex_unlock(&broker.lock);
}
void broker_unsubscribe(cstr topic, vws_cid_t cid)
{
uv_mutex_lock(&broker.lock);
topic_entry* e = broker_get_topic(topic, false);
if (e != NULL)
{
for (int i = 0; i < e->count; i++)
{
if (e->subscribers[i].key == cid.key)
{
// Shift remaining subscribers down
for (int j = i; j < e->count - 1; j++)
{
e->subscribers[j] = e->subscribers[j + 1];
}
e->count--;
break;
}
}
}
uv_mutex_unlock(&broker.lock);
}
//----------------------------------------------------------------------
// Message processing
//----------------------------------------------------------------------
void process(vws_svr* s, vws_cid_t cid, vrtql_msg* m, void* ctx)
{
vrtql_msg_svr* server = (vrtql_msg_svr*)s;
cstr action = vrtql_msg_get_routing(m, "action");
cstr topic = vrtql_msg_get_routing(m, "topic");
if (action == NULL || topic == NULL)
{
vrtql_msg_free(m);
return;
}
if (strcmp(action, "subscribe") == 0)
{
broker_subscribe(topic, cid);
// Acknowledge
vrtql_msg* ack = vrtql_msg_new();
ack->format = m->format;
vrtql_msg_set_routing(ack, "action", "subscribed");
vrtql_msg_set_routing(ack, "topic", topic);
server->send(s, cid, ack, NULL);
}
else if (strcmp(action, "unsubscribe") == 0)
{
broker_unsubscribe(topic, cid);
// Acknowledge
vrtql_msg* ack = vrtql_msg_new();
ack->format = m->format;
vrtql_msg_set_routing(ack, "action", "unsubscribed");
vrtql_msg_set_routing(ack, "topic", topic);
server->send(s, cid, ack, NULL);
}
else if (strcmp(action, "publish") == 0)
{
uv_mutex_lock(&broker.lock);
topic_entry* e = broker_get_topic(topic, false);
if (e != NULL)
{
for (int i = 0; i < e->count; i++)
{
// Create a copy of the message for each subscriber
vrtql_msg* copy = vrtql_msg_new();
copy->format = m->format;
vrtql_msg_set_routing(copy, "action", "message");
vrtql_msg_set_routing(copy, "topic", topic);
if (m->content->size > 0)
{
vws_buffer_append( copy->content,
m->content->data,
m->content->size );
}
// Use dispatch() to send without freeing
server->dispatch(s, e->subscribers[i], copy, NULL);
}
}
uv_mutex_unlock(&broker.lock);
}
vrtql_msg_free(m);
}
int main(int argc, const char* argv[])
{
broker_init();
// Create server
vrtql_msg_svr* server = vrtql_msg_svr_new(10, 0, 0);
server->process = process;
// Run
vrtql_msg_svr_run(server, "127.0.0.1", 8181);
// Cleanup
vrtql_msg_svr_free(server);
broker_destroy();
vws_cleanup();
return 0;
}
A client can interact with this broker using the Message API. To subscribe
to a topic, the client sends a message with routing keys
"action" set to "subscribe" and
"topic" set to the desired topic name. To publish, the client
sends a message with "action" set to
"publish", the "topic", and the message
content in the payload. The following illustrates a client subscribing and
publishing:
// Subscribe to "news" topic vrtql_msg* sub = vrtql_msg_new(); vrtql_msg_set_routing(sub, "action", "subscribe"); vrtql_msg_set_routing(sub, "topic", "news"); vrtql_msg_send(cnx, sub); vrtql_msg_free(sub); // Publish a message to "news" topic vrtql_msg* pub = vrtql_msg_new(); vrtql_msg_set_routing(pub, "action", "publish"); vrtql_msg_set_routing(pub, "topic", "news"); vrtql_msg_set_content(pub, "Breaking news: VRTQL 2.0 released!"); vrtql_msg_send(cnx, pub); vrtql_msg_free(pub);