123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- // mqttmsg.h :
- //
- #if !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_)
- #define AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_
- #include <queue>
- #include <vector>
- #include <array>
- #include <pthread.h>
- #include <mosquittopp.h>
- #ifndef _LIBBUILD
- #include <gfa/svc/mqttcl/mqttjson.h>
- #include <gfa/svc/common/debug.h>
- #include <gfa/svc/common/logfile.h>
- #else // _LIBBUILD
- #include "mqttjson.h"
- #include "common/debug.h"
- #include "common/logfile.h"
- #endif // _LIBBUILD
- #ifdef __cplusplus
- #define _TOPIC_BUFFER_LENGTH 192
- #define _PAYLOAD_BUFFER_LENGTH 320
- #define _MSG_POOL_SIZE 256
- #ifdef _TARGET_BUILD
- extern "C" void mosquitto_message_free_contents(struct mosquitto_message *message);
- #endif // _TARGET_BUILD
- /////////////////////////////////////////////////////////////////////////////
- // mqttmsg.h - Declarations:
- class CMqttMessage : public mosquitto_message
- {
- private:
- CMqttMessage(void);
- virtual ~CMqttMessage(void);
-
- public:
- static CMqttMessage* CreateMessage(void);
- static CMqttMessage* CreateMessage(const struct mosquitto_message *pMsg);
- static CMqttMessage* CreateMessage(const char *topic, const void *payload, int payloadlen, int qos, bool retain, int mid = 0);
- static CMqttMessage* CreateRemoveRetainedMessage(const char *topic, int qos, int mid = 0);
- void Release(bool bFreePool = false);
-
- public:
- bool TopicMatchesSub(const char *pszSub);
- bool TopicTokenize(std::vector<std::string> &tokArr, size_t nStart = 0);
- std::string GetTopic(size_t nStart = 0);
- bool GetPayloadAsJSON(CJson_t &json, std::string &sErr);
- inline void SetAvailable(bool bAvail) {
- m_bIsAvail = bAvail;}
- inline void SetPoolMsg(bool bIsPool) {
- m_bIsPool = bIsPool;}
- inline bool IsAvailable(void) const {
- return m_bIsAvail;}
- inline bool IsPoolMsg(bool bIsPool) const {
- return m_bIsPool;}
- private:
- char m_szTopic[_TOPIC_BUFFER_LENGTH];
- unsigned char m_bufPayload[_PAYLOAD_BUFFER_LENGTH];
- bool m_bFreeTopic;
- bool m_bFreePayload;
- bool m_bIsAvail;
- bool m_bIsPool;
- };
- /////////////////////////////////////////////////////////////////////////////
- class CMqttMessagePool
- {
- public:
- CMqttMessagePool(void);
- virtual ~CMqttMessagePool(void);
-
- CMqttMessage* GetMsgFromPool(void);
- void ReturnMsgToPool(CMqttMessage *pMsg);
- private:
- std::array<CMqttMessage*, _MSG_POOL_SIZE> m_pool;
- };
- /////////////////////////////////////////////////////////////////////////////
- class CMqttMessageQueue
- {
- public:
- CMqttMessageQueue(void);
- virtual ~CMqttMessageQueue(void);
-
- void Push(CMqttMessage *pMsg);
- CMqttMessage* Pop(void);
- size_t Size(void) const {
- return m_queue.size();
- }
- private:
- bool Lock(void){
- return !::pthread_mutex_lock(&m_mtx);}
- bool TryLock(void){
- return !::pthread_mutex_trylock(&m_mtx);}
- bool Unlock(void){
- return !::pthread_mutex_unlock(&m_mtx);}
- private:
- std::queue<CMqttMessage*> m_queue;
- pthread_mutex_t m_mtx;
- pthread_mutexattr_t m_mtxAtt;
- };
- /////////////////////////////////////////////////////////////////////////////
- #endif // __cplusplus
- #endif // !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_)
|