123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- #include <string.h>
- #include "mqttmsg.h"
- #include "strutil.h"
- static CMqttMessagePool g_msgPool;
- #ifdef _TARGET_BUILD
- void mosquitto_message_free_contents(struct mosquitto_message *message)
- {
- if(message)
- {
- if(message->topic)
- {
- free(message->topic);
- message->topic = NULL;
- }
- if(message->payload)
- {
- free(message->payload);
- message->payload = NULL;
- }
- }
- }
- #endif // _TARGET_BUILD
- /////////////////////////////////////////////////////////////////////////////
- //
- static void* _memdup(const void *p, size_t s)
- {
- void *m = NULL;
- if(p && s)
- {
- if((m = malloc(s)))
- memcpy(m, p, s);
- }
- return m;
- }
- /////////////////////////////////////////////////////////////////////////////
- //
- CMqttMessage::CMqttMessage(void) : m_bFreeTopic(false), m_bFreePayload(false), m_bIsAvail(false), m_bIsPool(false)
- {
- memset(static_cast<mosquitto_message*>(this), 0, sizeof(struct mosquitto_message));
- memset(m_szTopic, 0, sizeof(m_szTopic));
- memset(m_bufPayload, 0, sizeof(m_bufPayload));
- }
- CMqttMessage::~CMqttMessage(void)
- {
- }
- /////////////////////////////////////////////////////////////////////////////
- //
- void CMqttMessage::Release(bool bFreePool)
- {
- if(topic && !m_bFreeTopic)
- topic = NULL;
- if(payload && !m_bFreePayload)
- payload = NULL;
- ::mosquitto_message_free_contents(static_cast<mosquitto_message*>(this));
- memset(static_cast<mosquitto_message*>(this), 0, sizeof(struct mosquitto_message));
- if(m_bIsPool && !bFreePool)
- g_msgPool.ReturnMsgToPool(this);
- else
- delete this;
- }
- /////////////////////////////////////////////////////////////////////////////
- //
- CMqttMessage* CMqttMessage::CreateMessage(void)
- {
- CMqttMessage *pMsg = new CMqttMessage;
- return pMsg;
- }
- CMqttMessage* CMqttMessage::CreateMessage(const struct mosquitto_message *pSrc)
- {
- if(!pSrc)
- return NULL;
- return CreateMessage(pSrc->topic, pSrc->payload, pSrc->payloadlen, pSrc->qos, pSrc->retain, pSrc->mid);
- }
- CMqttMessage* CMqttMessage::CreateMessage(const char *topic, const void *payload, int payloadlen, int qos, bool retain, int mid)
- {
- CMqttMessage *pMsg = g_msgPool.GetMsgFromPool();
- if(!pMsg)
- pMsg = new CMqttMessage;
- pMsg->mid = mid;
- if(topic && *topic)
- {
- size_t nLen = strlen(topic);
- if(nLen >= sizeof(m_szTopic))
- {
- pMsg->topic = strdup(topic);
- pMsg->m_bFreeTopic = true;
- TRACE("Dynamic memory allocation of %zu Bytes for topic required!\n", nLen);
- }
- else
- {
- strcpy(pMsg->m_szTopic, topic);
- pMsg->topic = pMsg->m_szTopic;
- pMsg->m_bFreeTopic = false;
- }
- }
- if(payload && payloadlen)
- {
- if(payloadlen > (int)sizeof(m_bufPayload))
- {
- pMsg->payload = _memdup(payload, payloadlen);
- pMsg->m_bFreePayload = true;
- TRACE("Dynamic memory allocation of %d Bytes for payload required!\n", payloadlen);
- }
- else
- {
- memcpy(pMsg->m_bufPayload, payload, payloadlen);
- pMsg->payload = pMsg->m_bufPayload;
- pMsg->m_bFreePayload = false;
- }
- }
- pMsg->payloadlen = payloadlen;
- pMsg->qos = qos;
- pMsg->retain = retain;
- return pMsg;
- }
- CMqttMessage* CMqttMessage::CreateRemoveRetainedMessage(const char *topic, int qos, int mid)
- {
- return CMqttMessage::CreateMessage(topic, NULL, 0, qos, true, mid);
- }
- /////////////////////////////////////////////////////////////////////////////
- //
- bool CMqttMessage::TopicMatchesSub(const char *pszSub)
- {
- bool bRes;
- if(::mosquitto_topic_matches_sub(pszSub, topic, &bRes) == MOSQ_ERR_SUCCESS)
- return bRes;
- return false;
- }
- bool CMqttMessage::TopicTokenize(std::vector<std::string> &tokArr, size_t nStart)
- {
- int nRet;
- if(!topic || !*topic)
- return false;
- size_t nLenTok = strlen(topic);
- if(nLenTok < nStart)
- return false;
- int nCount;
- char **ppszTopics;
- if((nRet = ::mosquitto_sub_topic_tokenise(topic + nStart, &ppszTopics, &nCount)) == MOSQ_ERR_SUCCESS)
- {
- for(int i = 0; i < nCount; i++)
- {
- tokArr.push_back(ppszTopics[i]);
- }
- ::mosquitto_sub_topic_tokens_free(&ppszTopics, nCount);
- }
- return (nRet == MOSQ_ERR_SUCCESS);
- }
- std::string CMqttMessage::GetTopic(size_t nStart)
- {
- if(!topic || !*topic)
- return "";
- size_t nLenTok = strlen(topic);
- if(nLenTok < nStart)
- return "";
- return (topic + nStart);
- }
- bool CMqttMessage::GetPayloadAsJSON(CJson_t &json, std::string &sErr)
- {
- if(payload && payloadlen)
- {
- json_t *pjtRet;
- json_error_t err;
- std::string s((const char*)payload, (size_t)payloadlen);
- if((pjtRet = ::json_loads(s.c_str(), JSON_REJECT_DUPLICATES, &err)))
- {
- sErr.clear();
- return json.Attach(pjtRet, true);
- }
- else
- {
- sErr = formatString("%s", err.text);
- }
- }
- else
- {
- sErr = formatString("empty payload");
- }
- return false;
- }
|