// mqttmsg.h : // #if !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_) #define AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_ #include #include #include #include #include #ifndef _LIBBUILD #include #include #include #else // _LIBBUILD #include "mqttjson.h" #include "common/debug.h" #include "common/logfile.h" #endif // _LIBBUILD #ifdef __cplusplus #define _TOPIC_BUFFER_LENGTH 192 #define _PAYLOAD_BUFFER_LENGTH 320 #define _MSG_POOL_SIZE 256 #ifdef _TARGET_BUILD extern "C" void mosquitto_message_free_contents(struct mosquitto_message *message); #endif // _TARGET_BUILD ///////////////////////////////////////////////////////////////////////////// // mqttmsg.h - Declarations: class CMqttMessage : public mosquitto_message { private: CMqttMessage(void); virtual ~CMqttMessage(void); public: static CMqttMessage* CreateMessage(void); static CMqttMessage* CreateMessage(const struct mosquitto_message *pMsg); static CMqttMessage* CreateMessage(const char *topic, const void *payload, int payloadlen, int qos, bool retain, int mid = 0); static CMqttMessage* CreateRemoveRetainedMessage(const char *topic, int qos, int mid = 0); void Release(bool bFreePool = false); public: bool TopicMatchesSub(const char *pszSub); bool TopicTokenize(std::vector &tokArr, size_t nStart = 0); std::string GetTopic(size_t nStart = 0); bool GetPayloadAsJSON(CJson_t &json, std::string &sErr); inline void SetAvailable(bool bAvail) { m_bIsAvail = bAvail;} inline void SetPoolMsg(bool bIsPool) { m_bIsPool = bIsPool;} inline bool IsAvailable(void) const { return m_bIsAvail;} inline bool IsPoolMsg(bool bIsPool) const { return m_bIsPool;} private: char m_szTopic[_TOPIC_BUFFER_LENGTH]; unsigned char m_bufPayload[_PAYLOAD_BUFFER_LENGTH]; bool m_bFreeTopic; bool m_bFreePayload; bool m_bIsAvail; bool m_bIsPool; }; ///////////////////////////////////////////////////////////////////////////// class CMqttMessagePool { public: CMqttMessagePool(void); virtual ~CMqttMessagePool(void); CMqttMessage* GetMsgFromPool(void); void ReturnMsgToPool(CMqttMessage *pMsg); private: std::array m_pool; }; ///////////////////////////////////////////////////////////////////////////// class CMqttMessageQueue { public: CMqttMessageQueue(void); virtual ~CMqttMessageQueue(void); void Push(CMqttMessage *pMsg); CMqttMessage* Pop(void); size_t Size(void) const { return m_queue.size(); } private: bool Lock(void){ return !::pthread_mutex_lock(&m_mtx);} bool TryLock(void){ return !::pthread_mutex_trylock(&m_mtx);} bool Unlock(void){ return !::pthread_mutex_unlock(&m_mtx);} private: std::queue m_queue; pthread_mutex_t m_mtx; pthread_mutexattr_t m_mtxAtt; }; ///////////////////////////////////////////////////////////////////////////// #endif // __cplusplus #endif // !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_)