#include #include "mqttmsg.h" #include "strutil.h" static CMqttMessagePool g_msgPool; #ifdef _TARGET_BUILD void mosquitto_message_free_contents(struct mosquitto_message *message) { if(message) { if(message->topic) { free(message->topic); message->topic = NULL; } if(message->payload) { free(message->payload); message->payload = NULL; } } } #endif // _TARGET_BUILD ///////////////////////////////////////////////////////////////////////////// // static void* _memdup(const void *p, size_t s) { void *m = NULL; if(p && s) { if((m = malloc(s))) memcpy(m, p, s); } return m; } ///////////////////////////////////////////////////////////////////////////// // CMqttMessage::CMqttMessage(void) : m_bFreeTopic(false), m_bFreePayload(false), m_bIsAvail(false), m_bIsPool(false) { memset(static_cast(this), 0, sizeof(struct mosquitto_message)); memset(m_szTopic, 0, sizeof(m_szTopic)); memset(m_bufPayload, 0, sizeof(m_bufPayload)); } CMqttMessage::~CMqttMessage(void) { } ///////////////////////////////////////////////////////////////////////////// // void CMqttMessage::Release(bool bFreePool) { if(topic && !m_bFreeTopic) topic = NULL; if(payload && !m_bFreePayload) payload = NULL; ::mosquitto_message_free_contents(static_cast(this)); memset(static_cast(this), 0, sizeof(struct mosquitto_message)); if(m_bIsPool && !bFreePool) g_msgPool.ReturnMsgToPool(this); else delete this; } ///////////////////////////////////////////////////////////////////////////// // CMqttMessage* CMqttMessage::CreateMessage(void) { CMqttMessage *pMsg = new CMqttMessage; return pMsg; } CMqttMessage* CMqttMessage::CreateMessage(const struct mosquitto_message *pSrc) { if(!pSrc) return NULL; return CreateMessage(pSrc->topic, pSrc->payload, pSrc->payloadlen, pSrc->qos, pSrc->retain, pSrc->mid); } CMqttMessage* CMqttMessage::CreateMessage(const char *topic, const void *payload, int payloadlen, int qos, bool retain, int mid) { CMqttMessage *pMsg = g_msgPool.GetMsgFromPool(); if(!pMsg) pMsg = new CMqttMessage; pMsg->mid = mid; if(topic && *topic) { size_t nLen = strlen(topic); if(nLen >= sizeof(m_szTopic)) { pMsg->topic = strdup(topic); pMsg->m_bFreeTopic = true; TRACE("Dynamic memory allocation of %zu Bytes for topic required!\n", nLen); } else { strcpy(pMsg->m_szTopic, topic); pMsg->topic = pMsg->m_szTopic; pMsg->m_bFreeTopic = false; } } if(payload && payloadlen) { if(payloadlen > (int)sizeof(m_bufPayload)) { pMsg->payload = _memdup(payload, payloadlen); pMsg->m_bFreePayload = true; TRACE("Dynamic memory allocation of %d Bytes for payload required!\n", payloadlen); } else { memcpy(pMsg->m_bufPayload, payload, payloadlen); pMsg->payload = pMsg->m_bufPayload; pMsg->m_bFreePayload = false; } } pMsg->payloadlen = payloadlen; pMsg->qos = qos; pMsg->retain = retain; return pMsg; } CMqttMessage* CMqttMessage::CreateRemoveRetainedMessage(const char *topic, int qos, int mid) { return CMqttMessage::CreateMessage(topic, NULL, 0, qos, true, mid); } ///////////////////////////////////////////////////////////////////////////// // bool CMqttMessage::TopicMatchesSub(const char *pszSub) { bool bRes; if(::mosquitto_topic_matches_sub(pszSub, topic, &bRes) == MOSQ_ERR_SUCCESS) return bRes; return false; } bool CMqttMessage::TopicTokenize(std::vector &tokArr, size_t nStart) { int nRet; if(!topic || !*topic) return false; size_t nLenTok = strlen(topic); if(nLenTok < nStart) return false; int nCount; char **ppszTopics; if((nRet = ::mosquitto_sub_topic_tokenise(topic + nStart, &ppszTopics, &nCount)) == MOSQ_ERR_SUCCESS) { for(int i = 0; i < nCount; i++) { tokArr.push_back(ppszTopics[i]); } ::mosquitto_sub_topic_tokens_free(&ppszTopics, nCount); } return (nRet == MOSQ_ERR_SUCCESS); } std::string CMqttMessage::GetTopic(size_t nStart) { if(!topic || !*topic) return ""; size_t nLenTok = strlen(topic); if(nLenTok < nStart) return ""; return (topic + nStart); } bool CMqttMessage::GetPayloadAsJSON(CJson_t &json, std::string &sErr) { if(payload && payloadlen) { json_t *pjtRet; json_error_t err; std::string s((const char*)payload, (size_t)payloadlen); if((pjtRet = ::json_loads(s.c_str(), JSON_REJECT_DUPLICATES, &err))) { sErr.clear(); return json.Attach(pjtRet, true); } else { sErr = formatString("%s", err.text); } } else { sErr = formatString("empty payload"); } return false; }