#include #include "MQTTAsync.h" #include "LinkedList.h" static PyObject *MqttV3Error; static PyObject* mqttv3_create(PyObject* self, PyObject *args) { MQTTAsync c; char* serverURI; char* clientId; int persistence_option = MQTTCLIENT_PERSISTENCE_DEFAULT; PyObject *pyoptions = NULL, *temp; MQTTAsync_createOptions options = MQTTAsync_createOptions_initializer; int rc; if (!PyArg_ParseTuple(args, "ss|iO", &serverURI, &clientId, &persistence_option, &pyoptions)) return NULL; if (persistence_option != MQTTCLIENT_PERSISTENCE_DEFAULT && persistence_option != MQTTCLIENT_PERSISTENCE_NONE) { PyErr_SetString(PyExc_TypeError, "persistence must be DEFAULT or NONE"); return NULL; } if (pyoptions) { if (!PyDict_Check(pyoptions)) { PyErr_SetString(PyExc_TypeError, "Create options parameter must be a dictionary"); return NULL; } if ((temp = PyDict_GetItemString(pyoptions, "sendWhileDisconnected")) != NULL) { if (PyInt_Check(temp)) options.sendWhileDisconnected = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "sendWhileDisconnected value must be int"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "maxBufferedMessages")) != NULL) { if (PyInt_Check(temp)) options.maxBufferedMessages = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "maxBufferedMessages value must be int"); return NULL; } } rc = MQTTAsync_createWithOptions(&c, serverURI, clientId, persistence_option, NULL, &options); } else rc = MQTTAsync_create(&c, serverURI, clientId, persistence_option, NULL); return Py_BuildValue("ik", rc, c); } static List* callbacks = NULL; static List* connected_callbacks = NULL; enum msgTypes { CONNECT, PUBLISH, SUBSCRIBE, SUBSCRIBE_MANY, UNSUBSCRIBE }; typedef struct { MQTTAsync c; PyObject *context; PyObject *cl, *ma, *dc; } CallbackEntry; typedef struct { MQTTAsync c; PyObject *context; PyObject *co; } ConnectedEntry; int clientCompare(void* a, void* b) { CallbackEntry* e = (CallbackEntry*) a; return e->c == (MQTTAsync) b; } int connectedCompare(void* a, void* b) { ConnectedEntry* e = (ConnectedEntry*) a; return e->c == (MQTTAsync) b; } void connected(void* context, char* cause) { /* call the right Python function, using the context */ PyObject *arglist; PyObject *result; ConnectedEntry* e = context; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); arglist = Py_BuildValue("Os", e->context, cause); result = PyEval_CallObject(e->co, arglist); Py_DECREF(arglist); PyGILState_Release(gstate); } void connectionLost(void* context, char* cause) { /* call the right Python function, using the context */ PyObject *arglist; PyObject *result; CallbackEntry* e = context; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); arglist = Py_BuildValue("Os", e->context, cause); result = PyEval_CallObject(e->cl, arglist); Py_DECREF(arglist); PyGILState_Release(gstate); } int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { PyObject *result = NULL; CallbackEntry* e = context; int rc = -99; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); if (topicLen == 0) result = PyObject_CallFunction(e->ma, "Os{ss#sisisisi}", e->context, topicName, "payload", message->payload, message->payloadlen, "qos", message->qos, "retained", message->retained, "dup", message->dup, "msgid", message->msgid); else result = PyObject_CallFunction(e->ma, "Os#{ss#sisisisi}", e->context, topicName, topicLen, "payload", message->payload, message->payloadlen, "qos", message->qos, "retained", message->retained, "dup", message->dup, "msgid", message->msgid); if (result) { if (PyInt_Check(result)) rc = (int) PyInt_AsLong(result); Py_DECREF(result); } PyGILState_Release(gstate); MQTTAsync_free(topicName); MQTTAsync_freeMessage(&message); return rc; } void deliveryComplete(void* context, MQTTAsync_token dt) { PyObject *arglist; PyObject *result; CallbackEntry* e = context; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); arglist = Py_BuildValue("Oi", e->context, dt); result = PyEval_CallObject(e->dc, arglist); Py_DECREF(arglist); PyGILState_Release(gstate); } static PyObject* mqttv3_setcallbacks(PyObject* self, PyObject *args) { MQTTAsync c; CallbackEntry* e = NULL; int rc; e = malloc(sizeof(CallbackEntry)); if (!PyArg_ParseTuple(args, "kOOOO", &c, (PyObject**) &e->context, &e->cl, &e->ma, &e->dc)) return NULL; e->c = c; if ((e->cl != Py_None && !PyCallable_Check(e->cl)) || (e->ma != Py_None && !PyCallable_Check(e->ma)) || (e->dc != Py_None && !PyCallable_Check(e->dc))) { PyErr_SetString(PyExc_TypeError, "3rd, 4th and 5th parameters must be callable or None"); return NULL; } Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_setCallbacks(c, e, connectionLost, messageArrived, deliveryComplete); Py_END_ALLOW_THREADS if (rc == MQTTASYNC_SUCCESS) { ListElement* temp = NULL; if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL) { ListDetach(callbacks, temp->content); free(temp->content); } ListAppend(callbacks, e, sizeof(e)); Py_XINCREF(e->cl); Py_XINCREF(e->ma); Py_XINCREF(e->dc); Py_XINCREF(e->context); } return Py_BuildValue("i", rc); } static PyObject* mqttv3_setconnected(PyObject* self, PyObject *args) { MQTTAsync c; ConnectedEntry* e = NULL; int rc; e = malloc(sizeof(ConnectedEntry)); if (!PyArg_ParseTuple(args, "kOO", &c, (PyObject**) &e->context, &e->co)) return NULL; e->c = c; if (e->co != Py_None && !PyCallable_Check(e->co)) { PyErr_SetString(PyExc_TypeError, "3rd parameter must be callable or None"); return NULL; } Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_setConnected(c, e, connected); Py_END_ALLOW_THREADS if (rc == MQTTASYNC_SUCCESS) { ListElement* temp = NULL; if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL) { ListDetach(connected_callbacks, temp->content); free(temp->content); } ListAppend(connected_callbacks, e, sizeof(e)); Py_XINCREF(e->co); Py_XINCREF(e->context); } return Py_BuildValue("i", rc); } typedef struct { MQTTAsync c; PyObject *context; PyObject *onSuccess, *onFailure; enum msgTypes msgType; } ResponseEntry; void onSuccess(void* context, MQTTAsync_successData* response) { PyObject *result = NULL; ResponseEntry* e = context; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); switch (e->msgType) { case CONNECT: result = PyObject_CallFunction(e->onSuccess, "O{sisiss}", (e->context) ? e->context : Py_None, "MQTTVersion", response->alt.connect.MQTTVersion, "sessionPresent", response->alt.connect.sessionPresent, "serverURI", response->alt.connect.serverURI); break; case PUBLISH: result = PyObject_CallFunction(e->onSuccess, "O{si ss s{ss# sisi}}", (e->context) ? e->context : Py_None, "token", response->token, "destinationName", response->alt.pub.destinationName, "message", "payload", response->alt.pub.message.payload, response->alt.pub.message.payloadlen, "qos", response->alt.pub.message.qos, "retained", response->alt.pub.message.retained); break; case SUBSCRIBE: result = PyObject_CallFunction(e->onSuccess, "O{sisi}", (e->context) ? e->context : Py_None, "token", response->token, "qos", response->alt.qos); break; case SUBSCRIBE_MANY: result = PyObject_CallFunction(e->onSuccess, "O{sis[i]}", (e->context) ? e->context : Py_None, "token", response->token, "qosList", response->alt.qosList[0]); break; case UNSUBSCRIBE: result = PyObject_CallFunction(e->onSuccess, "O{si}", (e->context) ? e->context : Py_None, "token", response->token); break; } if (result) { Py_DECREF(result); printf("decrementing reference count for result\n"); } PyGILState_Release(gstate); free(e); } void onFailure(void* context, MQTTAsync_failureData* response) { PyObject *result = NULL; PyObject *arglist = NULL; ResponseEntry* e = context; PyGILState_STATE gstate; gstate = PyGILState_Ensure(); // TODO: convert response into Python structure if (e->context) arglist = Py_BuildValue("OO", e->context, response); else arglist = Py_BuildValue("OO", Py_None, response); result = PyEval_CallObject(e->onFailure, arglist); Py_DECREF(arglist); PyGILState_Release(gstate); free(e); } /* return true if ok, false otherwise */ int getResponseOptions(MQTTAsync c, PyObject *pyoptions, MQTTAsync_responseOptions* responseOptions, enum msgTypes msgType) { PyObject *temp = NULL; if (!pyoptions) return 1; if (!PyDict_Check(pyoptions)) { PyErr_SetString(PyExc_TypeError, "Response options must be a dictionary"); return 0; } if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL) { if (PyCallable_Check(temp)) /* temp points to Python function */ responseOptions->onSuccess = (MQTTAsync_onSuccess*)temp; else { PyErr_SetString(PyExc_TypeError, "onSuccess value must be callable"); return 0; } } if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL) { if (PyCallable_Check(temp)) responseOptions->onFailure = (MQTTAsync_onFailure*)temp; else { PyErr_SetString(PyExc_TypeError, "onFailure value must be callable"); return 0; } } responseOptions->context = PyDict_GetItemString(pyoptions, "context"); if (responseOptions->onFailure || responseOptions->onSuccess) { ResponseEntry* r = malloc(sizeof(ResponseEntry)); r->c = c; r->context = responseOptions->context; responseOptions->context = r; r->onSuccess = (PyObject*)responseOptions->onSuccess; responseOptions->onSuccess = onSuccess; r->onFailure = (PyObject*)responseOptions->onFailure; responseOptions->onFailure = onFailure; r->msgType = msgType; } return 1; /* not an error, if we get here */ } static PyObject* mqttv3_connect(PyObject* self, PyObject *args) { MQTTAsync c; PyObject *pyoptions = NULL, *temp; MQTTAsync_connectOptions connectOptions = MQTTAsync_connectOptions_initializer; MQTTAsync_willOptions willOptions = MQTTAsync_willOptions_initializer; int rc; if (!PyArg_ParseTuple(args, "k|O", &c, &pyoptions)) return NULL; if (!pyoptions) goto skip; if (!PyDict_Check(pyoptions)) { PyErr_SetString(PyExc_TypeError, "2nd parameter must be a dictionary"); return NULL; } if ((temp = PyDict_GetItemString(pyoptions, "onSuccess")) != NULL) { if (PyCallable_Check(temp)) /* temp points to Python function */ connectOptions.onSuccess = (MQTTAsync_onSuccess*)temp; else { PyErr_SetString(PyExc_TypeError, "onSuccess value must be callable"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "onFailure")) != NULL) { if (PyCallable_Check(temp)) connectOptions.onFailure = (MQTTAsync_onFailure*)temp; else { PyErr_SetString(PyExc_TypeError, "onFailure value must be callable"); return NULL; } } connectOptions.context = PyDict_GetItemString(pyoptions, "context"); if (connectOptions.onFailure || connectOptions.onSuccess) { ResponseEntry* r = malloc(sizeof(ResponseEntry)); r->c = c; r->context = connectOptions.context; connectOptions.context = r; r->onSuccess = (PyObject*)connectOptions.onSuccess; connectOptions.onSuccess = onSuccess; r->onFailure = (PyObject*)connectOptions.onFailure; connectOptions.onFailure = onFailure; r->msgType = CONNECT; } if ((temp = PyDict_GetItemString(pyoptions, "keepAliveInterval")) != NULL) { if (PyInt_Check(temp)) connectOptions.keepAliveInterval = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "keepAliveLiveInterval value must be int"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "cleansession")) != NULL) { if (PyInt_Check(temp)) connectOptions.cleansession = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "cleansession value must be int"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "will")) != NULL) { if (PyDict_Check(temp)) { PyObject *wtemp = NULL; if ((wtemp = PyDict_GetItemString(temp, "topicName")) == NULL) { PyErr_SetString(PyExc_TypeError, "will topicName value must be set"); return NULL; } else { if (PyString_Check(wtemp)) willOptions.topicName = PyString_AsString(wtemp); else { PyErr_SetString(PyExc_TypeError, "will topicName value must be string"); return NULL; } } if ((wtemp = PyDict_GetItemString(temp, "message")) == NULL) { PyErr_SetString(PyExc_TypeError, "will message value must be set"); return NULL; } else { if (PyString_Check(wtemp)) willOptions.message = PyString_AsString(wtemp); else { PyErr_SetString(PyExc_TypeError, "will message value must be string"); return NULL; } } if ((wtemp = PyDict_GetItemString(temp, "retained")) != NULL) { if (PyInt_Check(wtemp)) willOptions.retained = (int) PyInt_AsLong(wtemp); else { PyErr_SetString(PyExc_TypeError, "will retained value must be int"); return NULL; } } if ((wtemp = PyDict_GetItemString(temp, "qos")) != NULL) { if (PyInt_Check(wtemp)) willOptions.qos = (int) PyInt_AsLong(wtemp); else { PyErr_SetString(PyExc_TypeError, "will qos value must be int"); return NULL; } } connectOptions.will = &willOptions; } else { PyErr_SetString(PyExc_TypeError, "will value must be dictionary"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "username")) != NULL) { if (PyString_Check(temp)) connectOptions.username = PyString_AsString(temp); else { PyErr_SetString(PyExc_TypeError, "username value must be string"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "password")) != NULL) { if (PyString_Check(temp)) connectOptions.username = PyString_AsString(temp); else { PyErr_SetString(PyExc_TypeError, "password value must be string"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "automaticReconnect")) != NULL) { if (PyInt_Check(temp)) connectOptions.automaticReconnect = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "automatic reconnect value must be int"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "minRetryInterval")) != NULL) { if (PyInt_Check(temp)) connectOptions.minRetryInterval = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "minRetryInterval value must be int"); return NULL; } } if ((temp = PyDict_GetItemString(pyoptions, "maxRetryInterval")) != NULL) { if (PyInt_Check(temp)) connectOptions.maxRetryInterval = (int) PyInt_AsLong(temp); else { PyErr_SetString(PyExc_TypeError, "maxRetryInterval value must be int"); return NULL; } } skip: Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_connect(c, &connectOptions); Py_END_ALLOW_THREADS return Py_BuildValue("i", rc); } static PyObject* mqttv3_disconnect(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; int rc; if (!PyArg_ParseTuple(args, "k|i", &c, &options.timeout)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_disconnect(c, &options); Py_END_ALLOW_THREADS return Py_BuildValue("i", rc); } static PyObject* mqttv3_isConnected(PyObject* self, PyObject *args) { MQTTAsync c; int rc; if (!PyArg_ParseTuple(args, "k", &c)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_isConnected(c); Py_END_ALLOW_THREADS return Py_BuildValue("i", rc); } static PyObject* mqttv3_subscribe(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject *pyoptions = NULL; char* topic; int qos = 2; int rc; if (!PyArg_ParseTuple(args, "ks|iO", &c, &topic, &qos, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE)) return NULL; Py_BEGIN_ALLOW_THREADS; rc = MQTTAsync_subscribe(c, topic, qos, &response); Py_END_ALLOW_THREADS; return Py_BuildValue("i", rc); } static PyObject* mqttv3_subscribeMany(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject* topicList; PyObject* qosList; PyObject *pyoptions = NULL; int count; char** topics; int* qoss; int i, rc = 0; if (!PyArg_ParseTuple(args, "kOO|O", &c, &topicList, &qosList, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, SUBSCRIBE)) return NULL; if (!PySequence_Check(topicList) || !PySequence_Check(qosList)) { PyErr_SetString(PyExc_TypeError, "3rd and 4th parameters must be sequences"); return NULL; } if ((count = PySequence_Length(topicList)) != PySequence_Length(qosList)) { PyErr_SetString(PyExc_TypeError, "3rd and 4th parameters must be sequences of the same length"); return NULL; } topics = malloc(count * sizeof(char*)); for (i = 0; i < count; ++i) topics[i] = PyString_AsString(PySequence_GetItem(topicList, i)); qoss = malloc(count * sizeof(int)); for (i = 0; i < count; ++i) qoss[i] = (int) PyInt_AsLong(PySequence_GetItem(qosList, i)); Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_subscribeMany(c, count, topics, qoss, &response); Py_END_ALLOW_THREADS for (i = 0; i < count; ++i) PySequence_SetItem(qosList, i, PyInt_FromLong((long) qoss[i])); free(topics); free(qoss); if (rc == MQTTASYNC_SUCCESS) return Py_BuildValue("iO", rc, qosList); else return Py_BuildValue("i", rc); } static PyObject* mqttv3_unsubscribe(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject *pyoptions = NULL; char* topic; int rc; if (!PyArg_ParseTuple(args, "ks|O", &c, &topic, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_unsubscribe(c, topic, &response); Py_END_ALLOW_THREADS return Py_BuildValue("i", rc); } static PyObject* mqttv3_unsubscribeMany(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject* topicList; PyObject *pyoptions = NULL; int count; char** topics; int i, rc = 0; if (!PyArg_ParseTuple(args, "kO|O", &c, &topicList, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, UNSUBSCRIBE)) return NULL; if (!PySequence_Check(topicList)) { PyErr_SetString(PyExc_TypeError, "3rd parameter must be sequences"); return NULL; } count = PySequence_Length(topicList); topics = malloc(count * sizeof(char*)); for (i = 0; i < count; ++i) topics[i] = PyString_AsString(PySequence_GetItem(topicList, i)); Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_unsubscribeMany(c, count, topics, &response); Py_END_ALLOW_THREADS free(topics); return Py_BuildValue("i", rc); } static PyObject* mqttv3_send(PyObject* self, PyObject *args) { MQTTAsync c; char* destinationName; int payloadlen; void* payload; int qos = 0; int retained = 0; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject *pyoptions = NULL; int rc; if (!PyArg_ParseTuple(args, "kss#|iiO", &c, &destinationName, &payload, &payloadlen, &qos, &retained, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, PUBLISH)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_send(c, destinationName, payloadlen, payload, qos, retained, &response); Py_END_ALLOW_THREADS if (rc == MQTTASYNC_SUCCESS && qos > 0) return Py_BuildValue("ii", rc, response); else return Py_BuildValue("i", rc); } static PyObject* mqttv3_sendMessage(PyObject* self, PyObject *args) { MQTTAsync c; char* destinationName; PyObject *message, *temp; MQTTAsync_message msg = MQTTAsync_message_initializer; MQTTAsync_responseOptions response = MQTTAsync_responseOptions_initializer; PyObject *pyoptions = NULL; int rc; if (!PyArg_ParseTuple(args, "ksO|O", &c, &destinationName, &message, &pyoptions)) return NULL; if (!getResponseOptions(c, pyoptions, &response, PUBLISH)) return NULL; if (!PyDict_Check(message)) { PyErr_SetString(PyExc_TypeError, "3rd parameter must be a dictionary"); return NULL; } if ((temp = PyDict_GetItemString(message, "payload")) == NULL) { PyErr_SetString(PyExc_TypeError, "dictionary must have payload key"); return NULL; } if (PyString_Check(temp)) PyString_AsStringAndSize(temp, (char**) &msg.payload, (Py_ssize_t*) &msg.payloadlen); else { PyErr_SetString(PyExc_TypeError, "payload value must be string"); return NULL; } if ((temp = PyDict_GetItemString(message, "qos")) == NULL) msg.qos = (int) PyInt_AsLong(temp); if ((temp = PyDict_GetItemString(message, "retained")) == NULL) msg.retained = (int) PyInt_AsLong(temp); Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_sendMessage(c, destinationName, &msg, &response); Py_END_ALLOW_THREADS if (rc == MQTTASYNC_SUCCESS && msg.qos > 0) return Py_BuildValue("ii", rc, response); else return Py_BuildValue("i", rc); } static PyObject* mqttv3_waitForCompletion(PyObject* self, PyObject *args) { MQTTAsync c; unsigned long timeout = 1000L; MQTTAsync_token dt; int rc; if (!PyArg_ParseTuple(args, "ki|i", &c, &dt, &timeout)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_waitForCompletion(c, dt, timeout); Py_END_ALLOW_THREADS return Py_BuildValue("i", rc); } static PyObject* mqttv3_getPendingTokens(PyObject* self, PyObject *args) { MQTTAsync c; MQTTAsync_token* tokens; int rc; if (!PyArg_ParseTuple(args, "k", &c)) return NULL; Py_BEGIN_ALLOW_THREADS rc = MQTTAsync_getPendingTokens(c, &tokens); Py_END_ALLOW_THREADS if (rc == MQTTASYNC_SUCCESS) { int i = 0; PyObject* dts = PyList_New(0); while (tokens[i] != -1) PyList_Append(dts, PyInt_FromLong((long) tokens[i])); return Py_BuildValue("iO", rc, dts); } else return Py_BuildValue("i", rc); } static PyObject* mqttv3_destroy(PyObject* self, PyObject *args) { MQTTAsync c; ListElement* temp = NULL; if (!PyArg_ParseTuple(args, "k", &c)) return NULL; if ((temp = ListFindItem(callbacks, c, clientCompare)) != NULL) { ListDetach(callbacks, temp->content); free(temp->content); } if ((temp = ListFindItem(connected_callbacks, c, connectedCompare)) != NULL) { ListDetach(connected_callbacks, temp->content); free(temp->content); } MQTTAsync_destroy(&c); Py_INCREF(Py_None); return Py_None; } static PyMethodDef MqttV3Methods[] = { { "create", mqttv3_create, METH_VARARGS, "Create an MQTTv3 client." }, { "setcallbacks", mqttv3_setcallbacks, METH_VARARGS, "Sets the callback functions for a particular client." }, { "setconnected", mqttv3_setconnected, METH_VARARGS, "Sets the connected callback function for a particular client." }, { "connect", mqttv3_connect, METH_VARARGS, "Connects to a server using the specified options." }, { "disconnect", mqttv3_disconnect, METH_VARARGS, "Disconnects from a server." }, { "isConnected", mqttv3_isConnected, METH_VARARGS, "Determines if this client is currently connected to the server." }, { "subscribe", mqttv3_subscribe, METH_VARARGS, "Subscribe to the given topic." }, { "subscribeMany", mqttv3_subscribeMany, METH_VARARGS, "Subscribe to the given topics." }, { "unsubscribe", mqttv3_unsubscribe, METH_VARARGS, "Unsubscribe from the given topic." }, { "unsubscribeMany", mqttv3_unsubscribeMany, METH_VARARGS, "Unsubscribe from the given topics." }, { "send", mqttv3_send, METH_VARARGS, "Publish a message to the given topic." }, { "sendMessage", mqttv3_sendMessage, METH_VARARGS, "Publish a message to the given topic." }, { "waitForCompletion", mqttv3_waitForCompletion, METH_VARARGS, "Waits for the completion of the delivery of the message represented by a delivery token." }, { "getPendingTokens", mqttv3_getPendingTokens, METH_VARARGS, "Returns the tokens pending of completion." }, { "destroy", mqttv3_destroy, METH_VARARGS, "Free memory allocated to a MQTT client. It is the opposite to create." }, { NULL, NULL, 0, NULL } /* Sentinel */ }; PyMODINIT_FUNC initpaho_mqtt3a(void) { PyObject *m; PyEval_InitThreads(); callbacks = ListInitialize(); connected_callbacks = ListInitialize(); m = Py_InitModule("paho_mqtt3a", MqttV3Methods); if (m == NULL) return; MqttV3Error = PyErr_NewException("paho_mqtt3a.error", NULL, NULL); Py_INCREF(MqttV3Error); PyModule_AddObject(m, "error", MqttV3Error); PyModule_AddIntConstant(m, "SUCCESS", MQTTASYNC_SUCCESS); PyModule_AddIntConstant(m, "FAILURE", MQTTASYNC_FAILURE); PyModule_AddIntConstant(m, "DISCONNECTED", MQTTASYNC_DISCONNECTED); PyModule_AddIntConstant(m, "MAX_MESSAGES_INFLIGHT", MQTTASYNC_MAX_MESSAGES_INFLIGHT); PyModule_AddIntConstant(m, "BAD_UTF8_STRING", MQTTASYNC_BAD_UTF8_STRING); PyModule_AddIntConstant(m, "BAD_NULL_PARAMETER", MQTTASYNC_NULL_PARAMETER); PyModule_AddIntConstant(m, "BAD_TOPICNAME_TRUNCATED", MQTTASYNC_TOPICNAME_TRUNCATED); PyModule_AddIntConstant(m, "PERSISTENCE_DEFAULT", MQTTCLIENT_PERSISTENCE_DEFAULT); PyModule_AddIntConstant(m, "PERSISTENCE_NONE", MQTTCLIENT_PERSISTENCE_NONE); PyModule_AddIntConstant(m, "PERSISTENCE_USER", MQTTCLIENT_PERSISTENCE_USER); PyModule_AddIntConstant(m, "PERSISTENCE_ERROR", MQTTCLIENT_PERSISTENCE_ERROR); }