/******************************************************************************* * Copyright (c) 2009, 2018 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation * Ian Craggs - MQTT 5.0 support *******************************************************************************/ /** * @file * MQTT V5 specific tests for the MQTT C client * * - topic aliases * - subscription ids * - session expiry * - payload format * - flow control * - QoS 2 exchange termination * - request/response * - shared subscriptions * - server initiated disconnect * - auth packets * - server assigned clientid returned in a property * - server defined keepalive * - subscribe failure */ #include "MQTTClient.h" #include #include #if !defined(_WINDOWS) #include #include #include #include #else #include #define setenv(a, b, c) _putenv_s(a, b) #endif #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) void usage(void) { printf("help!!\n"); exit(EXIT_FAILURE); } struct Options { char* connection; /**< connection to system under test. */ char** haconnections; char* proxy_connection; int hacount; int verbose; int test_no; int MQTTVersion; int iterations; } options = { "tcp://localhost:1883", NULL, "tcp://localhost:1884", 0, 0, 0, MQTTVERSION_5, 1, }; void getopts(int argc, char** argv) { int count = 1; while (count < argc) { if (strcmp(argv[count], "--test_no") == 0) { if (++count < argc) options.test_no = atoi(argv[count]); else usage(); } else if (strcmp(argv[count], "--connection") == 0) { if (++count < argc) { options.connection = argv[count]; printf("\nSetting connection to %s\n", options.connection); } else usage(); } else if (strcmp(argv[count], "--haconnections") == 0) { if (++count < argc) { char* tok = strtok(argv[count], " "); options.hacount = 0; options.haconnections = malloc(sizeof(char*) * 5); while (tok) { options.haconnections[options.hacount] = malloc(strlen(tok) + 1); strcpy(options.haconnections[options.hacount], tok); options.hacount++; tok = strtok(NULL, " "); } } else usage(); } else if (strcmp(argv[count], "--proxy_connection") == 0) { if (++count < argc) options.proxy_connection = argv[count]; else usage(); } else if (strcmp(argv[count], "--MQTTversion") == 0) { if (++count < argc) { options.MQTTVersion = atoi(argv[count]); printf("setting MQTT version to %d\n", options.MQTTVersion); } else usage(); } else if (strcmp(argv[count], "--iterations") == 0) { if (++count < argc) options.iterations = atoi(argv[count]); else usage(); } else if (strcmp(argv[count], "--verbose") == 0) { options.verbose = 1; printf("\nSetting verbose on\n"); } count++; } } #define LOGA_DEBUG 0 #define LOGA_INFO 1 #include #include #include void MyLog(int LOGA_level, char* format, ...) { static char msg_buf[256]; va_list args; struct timeb ts; struct tm timeinfo; if (LOGA_level == LOGA_DEBUG && options.verbose == 0) return; strcpy(msg_buf, ""); ftime(&ts); #if defined(WIN32) || defined(_WINDOWS) localtime_s(&timeinfo, &ts.time); #else localtime_r(&ts.time, &timeinfo); #endif strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo); sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm); va_start(args, format); vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args); va_end(args); printf("%s\n", msg_buf); fflush(stdout); } #if defined(WIN32) || defined(_WINDOWS) #define mqsleep(A) Sleep(1000*A) #define START_TIME_TYPE DWORD static DWORD start_time = 0; START_TIME_TYPE start_clock(void) { return GetTickCount(); } #elif defined(AIX) #define mqsleep sleep #define START_TIME_TYPE struct timespec START_TIME_TYPE start_clock(void) { static struct timespec start; clock_gettime(CLOCK_REALTIME, &start); return start; } #else #define mqsleep sleep #define START_TIME_TYPE struct timeval /* TODO - unused - remove? static struct timeval start_time; */ START_TIME_TYPE start_clock(void) { struct timeval start_time; gettimeofday(&start_time, NULL); return start_time; } #endif #if defined(WIN32) long elapsed(START_TIME_TYPE start_time) { return GetTickCount() - start_time; } #elif defined(AIX) #define assert(a) long elapsed(struct timespec start) { struct timespec now, res; clock_gettime(CLOCK_REALTIME, &now); ntimersub(now, start, res); return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L; } #else long elapsed(START_TIME_TYPE start_time) { struct timeval now, res; gettimeofday(&now, NULL); timersub(&now, &start_time, &res); return (res.tv_sec)*1000 + (res.tv_usec)/1000; } #endif #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d) #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e) int tests = 0; int failures = 0; FILE* xml; START_TIME_TYPE global_start_time; char output[3000]; char* cur_output = output; void write_test_result(void) { long duration = elapsed(global_start_time); fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000); if (cur_output != output) { fprintf(xml, "%s", output); cur_output = output; } fprintf(xml, "\n"); } void myassert(char* filename, int lineno, char* description, int value, char* format, ...) { ++tests; if (!value) { va_list args; ++failures; MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s\n", filename, lineno, description); va_start(args, format); vprintf(format, args); va_end(args); cur_output += sprintf(cur_output, "file %s, line %d \n", description, filename, lineno); } else MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s", filename, lineno, description); } void logProperties(MQTTProperties *props) { int i = 0; for (i = 0; i < props->count; ++i) { int id = props->array[i].identifier; const char* name = MQTTPropertyName(id); char* intformat = "Property name %s value %d"; switch (MQTTProperty_getType(id)) { case MQTTPROPERTY_TYPE_BYTE: MyLog(LOGA_INFO, intformat, name, props->array[i].value.byte); break; case MQTTPROPERTY_TYPE_TWO_BYTE_INTEGER: MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer2); break; case MQTTPROPERTY_TYPE_FOUR_BYTE_INTEGER: MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4); break; case MQTTPROPERTY_TYPE_VARIABLE_BYTE_INTEGER: MyLog(LOGA_INFO, intformat, name, props->array[i].value.integer4); break; case MQTTPROPERTY_TYPE_BINARY_DATA: case MQTTPROPERTY_TYPE_UTF_8_ENCODED_STRING: MyLog(LOGA_INFO, "Property name value %s %.*s", name, props->array[i].value.data.len, props->array[i].value.data.data); break; case MQTTPROPERTY_TYPE_UTF_8_STRING_PAIR: MyLog(LOGA_INFO, "Property name %s key %.*s value %.*s", name, props->array[i].value.data.len, props->array[i].value.data.data, props->array[i].value.value.len, props->array[i].value.value.data); break; } } } struct { int disconnected; } test_topic_aliases_globals = { 0, }; void disconnected(void* context, MQTTProperties* props, enum MQTTReasonCodes rc) { MQTTClient c = (MQTTClient)context; MyLog(LOGA_INFO, "Callback: disconnected, reason code \"%s\"", MQTTReasonCode_toString(rc)); logProperties(props); test_topic_aliases_globals.disconnected = 1; } static int messages_arrived = 0; int messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message) { MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.", topicName, message->payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { const int props_count = 0; assert("Properties count should be 0", message->properties.count == props_count, "Properties count was %d\n", message->properties.count); logProperties(&message->properties); } messages_arrived++; MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_client_topic_aliases(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer; MQTTProperties props = MQTTProperties_initializer; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperty property; MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; char* test_topic = "test_client_topic_aliases"; int topicAliasMaximum = 0; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, " 0", topicAliasMaximum > 0, "topicAliasMaximum was %d", topicAliasMaximum); /* subscribe to a topic */ response = MQTTClient_subscribe5(c, test_topic, 2, NULL, NULL); assert("Good rc from subscribe", response.reasonCode == MQTTREASONCODE_GRANTED_QOS_2, "rc was %d", response.reasonCode); /* then publish to the topic */ MQTTProperties_free(&pubmsg.properties); property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS; property.value.integer2 = 1; MQTTProperties_add(&pubmsg.properties, &property); messages_arrived = 0; response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt); assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); /* should get a response */ while (messages_arrived == 0 && ++count < 10) { #if defined(WIN32) Sleep(1000); #else usleep(1000000L); #endif } assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived); /* now publish to the topic alias only */ messages_arrived = 0; response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt); assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); /* should get a response */ while (messages_arrived == 0 && ++count < 10) { #if defined(WIN32) Sleep(1000); #else usleep(1000000L); #endif } assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived); rc = MQTTClient_disconnect5(c, 1000, MQTTREASONCODE_SUCCESS, NULL); /* Reconnect. Topic aliases should be deleted, but not subscription */ opts.cleanstart = 0; response = MQTTClient_connect5(c, &opts, NULL, NULL); assert("Good rc from connect", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); MQTTResponse_free(response); if (response.reasonCode != MQTTCLIENT_SUCCESS) goto exit; /* then publish to the topic */ MQTTProperties_free(&pubmsg.properties); messages_arrived = 0; response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt); assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); /* should get a response */ while (messages_arrived == 0 && ++count < 10) { #if defined(WIN32) Sleep(1000); #else usleep(1000000L); #endif } assert("1 message should have arrived", messages_arrived == 1, "was %d", messages_arrived); /* now publish to the topic alias only */ test_topic_aliases_globals.disconnected = 0; messages_arrived = 0; property.identifier = MQTTPROPERTY_CODE_TOPIC_ALIAS; property.value.integer2 = 1; MQTTProperties_add(&pubmsg.properties, &property); response = MQTTClient_publishMessage5(c, "", &pubmsg, &dt); assert("Good rc from publish", response.reasonCode == MQTTCLIENT_SUCCESS, "rc was %d", response.reasonCode); /* should not get a response */ while (messages_arrived == 0 && ++count < 10) { #if defined(WIN32) Sleep(1000); #else usleep(1000000L); #endif } assert("No message should have arrived", messages_arrived == 0, "was %d", messages_arrived); /* Now we expect to receive a disconnect packet telling us why */ count = 0; while (test_topic_aliases_globals.disconnected == 0 && ++count < 10) { #if defined(WIN32) Sleep(1000); #else usleep(1000000L); #endif } assert("Disconnected should be called", test_topic_aliases_globals.disconnected == 1, "was %d", test_topic_aliases_globals.disconnected); MQTTProperties_free(&pubmsg.properties); MQTTProperties_free(&props); MQTTProperties_free(&connect_props); MQTTClient_destroy(&c); exit: MyLog(LOGA_INFO, "TEST1: test %s. %d tests run, %d failures.", (failures == 0) ? "passed" : "failed", tests, failures); write_test_result(); return failures; } int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTClient_message* message) { static int received = 0; static int first_topic_alias = 0; int topicAlias = 0; received++; MyLog(LOGA_DEBUG, "Callback: message received on topic %s is %.*s.", topicName, message->payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { const int props_count = 0; if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS)) topicAlias = MQTTProperties_getNumericValue(&message->properties, MQTTPROPERTY_CODE_TOPIC_ALIAS); if (received == 1) first_topic_alias = topicAlias; else assert("All topic aliases should be the same", topicAlias == first_topic_alias, "Topic alias was %d\n", topicAlias); assert("topicAlias should not be 0", topicAlias > 0, "Topic alias was %d\n", topicAlias); logProperties(&message->properties); } messages_arrived++; MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_server_topic_aliases(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTClient_willOptions wopts = MQTTClient_willOptions_initializer; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperty property; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; char* test_topic = "test_server_topic_aliases"; int topicAliasMaximum = 0; int qos = 0; const int msg_count = 3; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { int subsidcount = 0, i = 0; subsidcount = MQTTProperties_propertyCount(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER); for (i = 0; i < subsidcount; ++i) { int subsid = MQTTProperties_getNumericValueAt(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER, i); assert("Subsid is i+1", subsid == i+1, "subsid is not correct %d\n", subsid); } logProperties(&message->properties); } messages_arrived++; MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_subscription_ids(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperties subs_props = MQTTProperties_initializer; MQTTProperty property; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; char* test_topic = "test_subscription_ids"; const int msg_count = 1; int subsids = 1; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); messages_arrived++; MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } static int blocking_found = 0; void test_flow_control_trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) { static char* msg = "Blocking publish on queue full"; if (strstr(message, msg) != NULL) blocking_found = 1; } int test_flow_control(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTProperties connect_props = MQTTProperties_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0, i = 0, count = 0; char* test_topic = "test_flow_control"; int receive_maximum = 65535; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { const int props_count = 0; MyLog(LOGA_INFO, "Message properties:"); logProperties(&message->properties); } test_request_response_globals.messages_arrived++; if (test_request_response_globals.messages_arrived == 1) { MQTTProperty *prop; assert("Topic should be request", strcmp(test_request_response_globals.request_topic, topicName) == 0, "topic was %s\n", topicName); if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_RESPONSE_TOPIC)) prop = MQTTProperties_getProperty(&message->properties, MQTTPROPERTY_CODE_RESPONSE_TOPIC); assert("Topic should be response", strncmp(test_request_response_globals.response_topic, prop->value.data.data, prop->value.data.len) == 0, "topic was %.4s\n", prop->value.data.data); if (MQTTProperties_hasProperty(&message->properties, MQTTPROPERTY_CODE_CORRELATION_DATA)) prop = MQTTProperties_getProperty(&message->properties, MQTTPROPERTY_CODE_CORRELATION_DATA); assert("Correlation data should be", strncmp(test_request_response_globals.correlation_id, prop->value.data.data, prop->value.data.len) == 0, "Correlation data was %.4s\n", prop->value.data.data); } else if (test_request_response_globals.messages_arrived == 2) { assert("Topic should be response", strcmp(test_request_response_globals.response_topic, topicName) == 0, "topic was %s\n", topicName); } MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_request_response(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperties subs_props = MQTTProperties_initializer; MQTTProperty property; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; char* test_topic = "test_request_response"; const int msg_count = 1; int subsids = 1; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { const int props_count = 0; MyLog(LOGA_INFO, "Message properties:"); logProperties(&message->properties); } test_subscribe_options_globals.messages_arrived++; if (test_subscribe_options_globals.messages_arrived == 1) { subsidcount = MQTTProperties_propertyCount(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER); assert("Subsidcount is i", subsidcount == 1, "subsidcount is not correct %d\n", subsidcount); subsid = MQTTProperties_getNumericValueAt(&message->properties, MQTTPROPERTY_CODE_SUBSCRIPTION_IDENTIFIER, 0); assert("Subsid is 2", subsid == 2, "subsid is not correct %d\n", subsid); } MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_subscribe_options(struct Options options) { int subsqos = 2; MQTTClient c; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperties subs_props = MQTTProperties_initializer; MQTTProperty property; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; const int msg_count = 1; MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "payloadlen, (char*)(message->payload)); assert("Message structure version should be 1", message->struct_version == 1, "message->struct_version was %d", message->struct_version); if (message->struct_version == 1) { const int props_count = 0; if (message->properties.count > 0) { MyLog(LOGA_INFO, "Message properties:"); logProperties(&message->properties); } } test_shared_subscriptions_globals.messages_arrived++; MQTTClient_free(topicName); MQTTClient_freeMessage(&message); return 1; } int test_shared_subscriptions(struct Options options) { int subsqos = 2; MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer5; MQTTProperties connect_props = MQTTProperties_initializer; MQTTProperties subs_props = MQTTProperties_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTResponse response = MQTTResponse_initializer; MQTTClient_deliveryToken dt; int rc = 0; int count = 0; const int msg_count = 1; MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer; int i; MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; fprintf(xml, "\n", (int)(ARRAY_SIZE(tests) - 1)); MQTTClient_setTraceCallback(test_flow_control_trace_callback); getopts(argc, argv); for (i = 0; i < options.iterations; ++i) { if (options.test_no == 0) { /* run all the tests */ for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no) rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */ } else rc = tests[options.test_no](options); /* run just the selected test */ } if (rc == 0) MyLog(LOGA_INFO, "verdict pass"); else MyLog(LOGA_INFO, "verdict fail"); fprintf(xml, "\n"); fclose(xml); return rc; }