/*
* -----------------------------------------------------------------
* IBM Websphere MQ Telemetry
* MQTTV3ASample MQTT v3 Asynchronous Client application
*
* Version: @(#) MQMBID sn=p000-L130522.1 su=_M3QBMsMbEeK31Ln-reX3cg pn=com.ibm.mq.mqxr.listener/SDK/clients/c/samples/MQTTV3ASample.c
*
*
* Licensed Materials - Property of IBM
*
* 5724-H72,
*
* (C) Copyright IBM Corp. 2010, 2012 All Rights Reserved.
*
* US Government Users Restricted Rights - Use, duplication or
* disclosure restricted by GSA ADP Schedule Contract with
* IBM Corp.
*
* -----------------------------------------------------------------
*/
/**
* This sample application demonstrates basic usage
* of the MQTT v3 Asynchronous Client api.
*
* It can be run in one of two modes:
* - as a publisher, sending a single message to a topic on the server
* - as a subscriber, listening for messages from the server
*
*/
#include
#include
#include
#include
#include
#include
#if defined(WIN32)
#include
#define sleep Sleep
#else
#include
#include
#endif
volatile int toStop = 0;
volatile int finished = 0;
volatile int connected = 0;
volatile int quietMode = 0;
volatile int sent = 0;
volatile int delivery = 0;
volatile MQTTAsync_token deliveredtoken;
static char clientId[24];
struct Options
{
char* action;
char* topic;
char* message;
int qos;
char* broker;
char* port;
int message_count;
} options =
{
"publish",
NULL,
"2",
2,
"localhost",
"1883",
100
};
void printHelp()
{
printf("Syntax:\n\n");
printf(" MQTTV3ASample [-h] [-a publish|subscribe] [-t ] [-m ]\n");
printf(" [-s 0|1|2] [-b ] [-p ] \n\n");
printf(" -h Print this help text and quit\n");
printf(" -q Quiet mode (default is false)\n");
printf(" -a Perform the relevant action (default is publish)\n");
printf(" -t Publish/subscribe to instead of the default\n");
printf(" (publish: \"MQTTV3ASample/C/v3\", subscribe: \"MQTTV3ASample/#\")\n");
printf(" -m Use this message instead of the default (\"Message from MQTTv3 C asynchronous client\")\n");
printf(" -s Use this QoS instead of the default (2)\n");
printf(" -b Use this name/IP address instead of the default (localhost)\n");
printf(" -p Use this port instead of the default (1883)\n");
printf("\nDelimit strings containing spaces with \"\"\n");
printf("\nPublishers transmit a single message then disconnect from the broker.\n");
printf("Subscribers remain connected to the broker and receive appropriate messages\n");
printf("until Control-C (^C) is received from the keyboard.\n\n");
}
void handleSignal(int sig)
{
toStop = 1;
}
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
if((sent++ % 1000) == 0)
printf("%d messages received\n", sent++);
//printf("Message arrived\n");
//printf(" topic: %s\n", topicName);
//printf(" message: ");
//payloadptr = message->payload;
//for(i=0; ipayloadlen; i++)
//{
// putchar(*payloadptr++);
//}
//putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed\n");
finished = 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}
void onSendFailure(void* context, MQTTAsync_failureData* response)
{
printf("onSendFailure: message with token value %d delivery failed\n", response->token);
}
void onSend(void* context, MQTTAsync_successData* response)
{
static last_send = 0;
if (response->token - last_send != 1)
printf("Error in onSend, token value %d, last_send %d\n", response->token, last_send);
last_send++;
if ((response->token % 1000) == 0)
printf("onSend: message with token value %d delivery confirmed\n", response->token);
}
void deliveryComplete(void* context, MQTTAsync_token token)
{
sent++;
if ((sent % 1000) == 0)
printf("deliveryComplete: message with token value %d delivery confirmed\n", token);
if (sent != token)
printf("Error, sent %d != token %d\n", sent, token);
if (sent == options.message_count)
toStop = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed\n");
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Connected\n");
connected=1;
}
void connectionLost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
conn_opts.retryInterval = 1000;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
{
printf("%s\n", message);
}
/**
* The main entry point of the sample.
*
* This method handles parsing the arguments specified on the
* command-line before performing the specified action.
*/
int main(int argc, char** argv)
{
int rc = 0;
int ch;
char url[256];
// Default settings:
int i=0;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
signal(SIGINT, handleSignal);
signal(SIGTERM, handleSignal);
quietMode = 0;
// Parse the arguments -
for (i=1; i 2)
{
printf("Invalid QoS: %d\n", options.qos);
printHelp();
return 255;
}
if (options.topic == NULL || ( options.topic != NULL && strlen(options.topic) == 0) )
{
// Set the default topic according to the specified action
if (strcmp(options.action, "publish") == 0)
options.topic = "MQTTV3ASample/C/v3";
else
options.topic = "MQTTV3ASample/#";
}
// Construct the full broker URL and clientId
sprintf(url, "tcp://%s:%s", options.broker, options.port);
sprintf(clientId, "SampleCV3A_%s", options.action);
MQTTAsync_create(&client, url, clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setTraceCallback(handleTrace);
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, deliveryComplete);
conn_opts.cleansession = 0;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
conn_opts.keepAliveInterval = 0;
conn_opts.retryInterval = 0;
//conn_opts.maxInflight= 30;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
goto exit;
}
printf("Waiting for connect\n");
while (connected == 0 && finished == 0 && toStop == 0) {
printf("Waiting for connect: %d %d %d\n", connected, finished, toStop);
usleep(10000L);
}
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
printf("Waiting for connect: %d %d %d\n", connected, finished, toStop);
printf("Successful connection\n");
if (connected == 1 && strcmp(options.action, "publish") == 0)
{
unsigned long i;
struct timeval tv;
gettimeofday(&tv,NULL);
printf("start seconds : %ld\n",tv.tv_sec);
for (i = 0; i < options.message_count; i++)
{
opts.onSuccess = onSend;
opts.onFailure = onSendFailure;
opts.context = client;
pubmsg.payload = options.message;
pubmsg.payloadlen = strlen(options.message);
pubmsg.qos = options.qos;
pubmsg.retained = 0;
deliveredtoken = 0;
usleep(100);
if ((rc = MQTTAsync_sendMessage(client, options.topic, &pubmsg, &opts))
!= MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
gettimeofday(&tv,NULL);
printf("end seconds : %ld\n",tv.tv_sec);
} else if (strcmp(options.action, "subscribe") == 0) {
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
if ((rc = MQTTAsync_subscribe(client, options.topic, options.qos, &opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
while (!finished)
{
#if defined(WIN32)
Sleep(100);
#else
usleep(1000L);
#endif
if (toStop == 1)
{
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
opts.onSuccess = onDisconnect;
opts.context = client;
printf("Entering disconnection phase\n");
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
toStop = 0;
}
}
exit:
printf("calling destroy\n");
MQTTAsync_destroy(&client);
return rc;
}