mqttmsg.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // mqttmsg.h :
  2. //
  3. #if !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_)
  4. #define AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_
  5. #include <queue>
  6. #include <vector>
  7. #include <array>
  8. #include <pthread.h>
  9. #include <mosquittopp.h>
  10. #ifndef _LIBBUILD
  11. #include <gfa/svc/mqttcl/mqttjson.h>
  12. #include <gfa/svc/common/debug.h>
  13. #include <gfa/svc/common/logfile.h>
  14. #else // _LIBBUILD
  15. #include "mqttjson.h"
  16. #include "common/debug.h"
  17. #include "common/logfile.h"
  18. #endif // _LIBBUILD
  19. #ifdef __cplusplus
  20. #define _TOPIC_BUFFER_LENGTH 192
  21. #define _PAYLOAD_BUFFER_LENGTH 320
  22. #define _MSG_POOL_SIZE 256
  23. #ifdef _TARGET_BUILD
  24. extern "C" void mosquitto_message_free_contents(struct mosquitto_message *message);
  25. #endif // _TARGET_BUILD
  26. /////////////////////////////////////////////////////////////////////////////
  27. // mqttmsg.h - Declarations:
  28. class CMqttMessage : public mosquitto_message
  29. {
  30. private:
  31. CMqttMessage(void);
  32. virtual ~CMqttMessage(void);
  33. public:
  34. static CMqttMessage* CreateMessage(void);
  35. static CMqttMessage* CreateMessage(const struct mosquitto_message *pMsg);
  36. static CMqttMessage* CreateMessage(const char *topic, const void *payload, int payloadlen, int qos, bool retain, int mid = 0);
  37. static CMqttMessage* CreateRemoveRetainedMessage(const char *topic, int qos, int mid = 0);
  38. void Release(bool bFreePool = false);
  39. public:
  40. bool TopicMatchesSub(const char *pszSub);
  41. bool TopicTokenize(std::vector<std::string> &tokArr, size_t nStart = 0);
  42. std::string GetTopic(size_t nStart = 0);
  43. bool GetPayloadAsJSON(CJson_t &json, std::string &sErr);
  44. inline void SetAvailable(bool bAvail) {
  45. m_bIsAvail = bAvail;}
  46. inline void SetPoolMsg(bool bIsPool) {
  47. m_bIsPool = bIsPool;}
  48. inline bool IsAvailable(void) const {
  49. return m_bIsAvail;}
  50. inline bool IsPoolMsg(bool bIsPool) const {
  51. return m_bIsPool;}
  52. private:
  53. char m_szTopic[_TOPIC_BUFFER_LENGTH];
  54. unsigned char m_bufPayload[_PAYLOAD_BUFFER_LENGTH];
  55. bool m_bFreeTopic;
  56. bool m_bFreePayload;
  57. bool m_bIsAvail;
  58. bool m_bIsPool;
  59. };
  60. /////////////////////////////////////////////////////////////////////////////
  61. class CMqttMessagePool
  62. {
  63. public:
  64. CMqttMessagePool(void);
  65. virtual ~CMqttMessagePool(void);
  66. CMqttMessage* GetMsgFromPool(void);
  67. void ReturnMsgToPool(CMqttMessage *pMsg);
  68. private:
  69. std::array<CMqttMessage*, _MSG_POOL_SIZE> m_pool;
  70. };
  71. /////////////////////////////////////////////////////////////////////////////
  72. class CMqttMessageQueue
  73. {
  74. public:
  75. CMqttMessageQueue(void);
  76. virtual ~CMqttMessageQueue(void);
  77. void Push(CMqttMessage *pMsg);
  78. CMqttMessage* Pop(void);
  79. size_t Size(void) const {
  80. return m_queue.size();
  81. }
  82. private:
  83. bool Lock(void){
  84. return !::pthread_mutex_lock(&m_mtx);}
  85. bool TryLock(void){
  86. return !::pthread_mutex_trylock(&m_mtx);}
  87. bool Unlock(void){
  88. return !::pthread_mutex_unlock(&m_mtx);}
  89. private:
  90. std::queue<CMqttMessage*> m_queue;
  91. pthread_mutex_t m_mtx;
  92. pthread_mutexattr_t m_mtxAtt;
  93. };
  94. /////////////////////////////////////////////////////////////////////////////
  95. #endif // __cplusplus
  96. #endif // !defined(AGD_MQTTMSG_H__A96D51EB_6025_49B5_AE1E_C483E489A73B__INCLUDED_)