#include #include #include "mqttclient.h" using namespace mosqpp; ///////////////////////////////////////////////////////////////////////////// // template static T _min(T v1, T v2) { return v1 < v2 ? v1 : v2; } ///////////////////////////////////////////////////////////////////////////// // #define MQTTCL_DEFAULT_LOOP_TIMEOUT 30 ///////////////////////////////////////////////////////////////////////////// // CMqttClient::CMqttClient(const char *id) : mosquittopp(id), m_pEvtCallback(NULL), m_evtMask(0), m_pUserParam(NULL) { } CMqttClient::~CMqttClient(void) { } ///////////////////////////////////////////////////////////////////////////// // int CMqttClient::Init(void) { return lib_init(); } int CMqttClient::Cleanup(void) { return lib_cleanup(); } ///////////////////////////////////////////////////////////////////////////// // int CMqttClient::publish(CMqttMessage *pMsg) { return mosquittopp::publish(&pMsg->mid, pMsg->topic, pMsg->payloadlen, pMsg->payload, pMsg->qos, pMsg->retain); } ///////////////////////////////////////////////////////////////////////////// // CMqttMessage* CMqttClient::PopRcvMsg(void) { return m_rcvMsg.Pop(); } void CMqttClient::PushRcvMsg(CMqttMessage *pMsg) { m_rcvMsg.Push(pMsg); } CMqttMessage* CMqttClient::PopSndMsg(void) { return m_sndMsg.Pop(); } void CMqttClient::PushSndMsg(CMqttMessage *pMsg) { m_sndMsg.Push(pMsg); } int CMqttClient::timed_loop(pc_time64_t nLoopTime) { int nRet, nLoopTimeout; pc_time64_t nRemaining = nLoopTime; m_pc.ClockTrigger(); do { nLoopTimeout = _min(MQTTCL_DEFAULT_LOOP_TIMEOUT, (int)(nRemaining / _PC_NS_PER_MS)); if((nRet = loop(nLoopTimeout)) != MOSQ_ERR_SUCCESS) break; if((nRemaining = nLoopTime - m_pc.ClockGetElapsed()) > (10 * _PC_NS_PER_MS)) { m_pc.MSleep(10); nRemaining = nLoopTime - m_pc.ClockGetElapsed(); } } while(nRemaining > (MQTTCL_DEFAULT_LOOP_TIMEOUT * _PC_NS_PER_MS)); return nRet; } /* MOSQ_ERR_CONN_PENDING = -1, MOSQ_ERR_SUCCESS = 0, MOSQ_ERR_NOMEM = 1, MOSQ_ERR_PROTOCOL = 2, MOSQ_ERR_INVAL = 3, MOSQ_ERR_NO_CONN = 4, MOSQ_ERR_CONN_REFUSED = 5, MOSQ_ERR_NOT_FOUND = 6, MOSQ_ERR_CONN_LOST = 7, MOSQ_ERR_TLS = 8, MOSQ_ERR_PAYLOAD_SIZE = 9, MOSQ_ERR_NOT_SUPPORTED = 10, MOSQ_ERR_AUTH = 11, MOSQ_ERR_ACL_DENIED = 12, MOSQ_ERR_UNKNOWN = 13, MOSQ_ERR_ERRNO = 14, MOSQ_ERR_EAI = 15, MOSQ_ERR_PROXY = 16 */ bool CMqttClient::EvalError(int nErr, bool &rbReconnect, bool &rbConnPending, bool &rbIntr, std::string &strErr) { bool bRet; rbReconnect = rbConnPending = false; if(rbIntr) return false; switch(nErr) { case MOSQ_ERR_SUCCESS: // no error rbReconnect = false; rbConnPending = false; bRet = true; break; case MOSQ_ERR_CONN_PENDING: // just wait rbReconnect = false; rbConnPending = true; bRet = false; break; case MOSQ_ERR_NO_CONN: // (maybe) recoverable errors case MOSQ_ERR_CONN_LOST: rbReconnect = true; rbConnPending = false; bRet = false; break; case MOSQ_ERR_ERRNO: // system error switch(errno) { case ECONNREFUSED: case ENETUNREACH: case ENETRESET: case ENETDOWN: case ETIMEDOUT: case ECONNRESET: case ECONNABORTED: case ENOTCONN: case ESHUTDOWN: case EHOSTDOWN: case EHOSTUNREACH: rbReconnect = true; rbConnPending = false; bRet = false; break; case EISCONN: case EALREADY: case EINPROGRESS: rbReconnect = false; rbConnPending = true; bRet = false; break; case EINTR: rbIntr = true; // fall through default: TRACE("errno: %d\n", errno); rbReconnect = false; rbConnPending = false; bRet = false; break; } break; default: // unrecoverable errors rbReconnect = false; rbConnPending = false; bRet = false; break; } if(!bRet) { const char *pszErr = ::mosquitto_strerror(nErr); strErr = pszErr; } return bRet; } bool CMqttClient::TimedLoop(pc_time64_t nLoopTime /* in nano seconds */, int &rnErr, bool &rbReconnect, bool &rbConnPending, bool &rbIntr, std::string &strErr) { bool bRet; int nLoopTimeout; pc_time64_t nRemaining = nLoopTime; m_pc.ClockTrigger(); do { nLoopTimeout = (int)_min((pc_time64_t)(MQTTCL_DEFAULT_LOOP_TIMEOUT * _PC_NS_PER_MS), nRemaining); rnErr = loop(nLoopTimeout / _PC_NS_PER_MS); if(rbIntr) { rbReconnect = false; rbConnPending = false; bRet = false; rnErr = MOSQ_ERR_ERRNO; break; } bRet = EvalError(rnErr, rbReconnect, rbConnPending, rbIntr, strErr); nRemaining = nLoopTime - m_pc.ClockGetElapsed(); if(bRet && !rbReconnect && (nRemaining > (10 * _PC_NS_PER_MS))) { if(usleep(10 * _PC_US_PER_MS) < 0) { if(errno == EINTR) { rbReconnect = false; rbConnPending = false; bRet = false; rnErr = MOSQ_ERR_ERRNO; rbIntr = true; break; } } nRemaining = nLoopTime - m_pc.ClockGetElapsed(); } else { break; } } while(nRemaining >= (MQTTCL_DEFAULT_LOOP_TIMEOUT * _PC_NS_PER_MS)); return bRet; } ///////////////////////////////////////////////////////////////////////////// // void CMqttClient::on_connect(int rc) { if(m_pEvtCallback && m_evtMask & NEVT_Connect) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Connect; ntf.con.rc = rc; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_connect_with_flags(int rc, int flags) { if(m_pEvtCallback && m_evtMask & NEVT_Connect_With_Flags) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Connect_With_Flags; ntf.con.rc = rc; ntf.con.flags = flags; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_disconnect(int rc) { if(m_pEvtCallback && m_evtMask & NEVT_Disconnect) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Disconnect; ntf.con.rc = rc; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_subscribe(int mid, int qos_count, const int *granted_qos) { if(m_pEvtCallback && m_evtMask & NEVT_Subscribe) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Subscribe; ntf.sub.mid = mid; ntf.sub.qos_count = qos_count; ntf.sub.granted_qos = granted_qos; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_unsubscribe(int mid) { if(m_pEvtCallback && m_evtMask & NEVT_Unsubscribe) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Unsubscribe; ntf.sub.mid = mid; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_publish(int mid) { if(m_pEvtCallback && m_evtMask & NEVT_Publish) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Publish; ntf.pub.mid = mid; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_message(const struct mosquitto_message *message) { CMqttMessage *pMsg = CMqttMessage::CreateMessage(message); PushRcvMsg(pMsg); if(m_pEvtCallback && m_evtMask & NEVT_Message) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Message; ntf.msg.msg = pMsg; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_log(int level, const char *str) { if(m_pEvtCallback && m_evtMask & NEVT_Log) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Log; ntf.log.level = level; ntf.log.str = str; (*m_pEvtCallback)(&ntf, m_pUserParam); } } void CMqttClient::on_error(void) { if(m_pEvtCallback && m_evtMask & NEVT_Error) { MQTT_GENERIC_NOTIFICATION ntf; ntf.evt = NEVT_Error; (*m_pEvtCallback)(NULL, m_pUserParam); } }