|
- // mqttvar.h :
- //
- #if !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)
- #define AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_
- #include <stdint.h>
- #include <string>
- #include <vector>
- #include <map>
- #include <forward_list>
- #include <utility>
- #include <endian.h>
- #include <gfa/gfaipc.h>
- #include <typeinfo>
- #include <typeindex>
- #ifndef _LIBBUILD
- #include <gfa/svc/common/strutil.h>
- #include <gfa/svc/common/conv.h>
- #include <gfa/svc/common/debug.h>
- #include <gfa/svc/mqttcl/mqttmsg.h>
- #include <gfa/svc/mqttcl/mqttjson.h>
- #include <gfa/svc/mqttcl/mqttdbg.h>
- #else // _LIBBUILD
- #include "common/strutil.h"
- #include "common/conv.h"
- #include "common/debug.h"
- #include "mqttmsg.h"
- #include "mqttjson.h"
- #include "mqttdbg.h"
- #endif // _LIBBUILD
- /////////////////////////////////////////////////////////////////////////////
- #define MQTT_VALUE_BINLE 0x00000001
- #define MQTT_VALUE_BINBE 0x00000002
- #define MQTT_VALUE_JSON 0x00000004
- #define MQTT_VALUE_PBUF 0x00000008
- #define MQTT_VALUE_ALL_FORMATS (MQTT_VALUE_BINLE | MQTT_VALUE_BINBE | MQTT_VALUE_JSON | MQTT_VALUE_PBUF)
- #define MQTT_TOPIC_CMD_VALUE "VALUE"
- #define MQTT_TOPIC_VALUE_BINLE "BINLE"
- #define MQTT_TOPIC_VALUE_BINBE "BINBE"
- #define MQTT_TOPIC_VALUE_JSON "JSON"
- #define MQTT_TOPIC_VALUE_PBUF "PBUF"
- #ifdef _DEBUG
- #define MQTT_JSON_OUTPUT_FLAGS JSON_MAX_INDENT
- #else // _DEBUG
- #define MQTT_JSON_OUTPUT_FLAGS JSON_COMPACT
- #endif // _DEBUG
- #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
- #define _UTF_16 "UTF-16LE//"
- #define _UTF_32 "UTF-32LE//"
- #define _UNICODE "WCHAR_T//"
- #elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
- #define _UTF_16 "UTF-16BE//"
- #define _UTF_32 "UTF-32BE//"
- #define _UNICODE "WCHAR_T//"
- #else // __BYTE_ORDER__
- #error Invalid or unsupported byte order!
- #endif // __BYTE_ORDER__
- #define MQTTCL_MIN_QOS 0
- #define MQTTCL_MAX_QOS 2
- #ifdef __cplusplus
- /////////////////////////////////////////////////////////////////////////////
- template <typename T>
- static void _swap_val(T &v)
- {
- // should static assert that T is a POD
- if(sizeof(T) > 1)
- {
- char &raw = reinterpret_cast<char&>(v);
- std::reverse(&raw, &raw + sizeof(T));
- }
- }
- template <typename T>
- static const T* _copy_swap_string_chars(const T *s, T *v, size_t cch)
- {
- if(sizeof(T) > 1 && cch > 0)
- {
- for(T *pv = v; cch > 0; --cch, ++pv, ++s)
- {
- *pv = *s;
- if(*pv)
- _swap_val(*pv);
- else
- break;
- }
- return v;
- }
- else
- {
- memcpy(v, s, cch);
- return v;
- }
- }
- /////////////////////////////////////////////////////////////////////////////
- // mqttvar.h - Declarations:
- class CMqttVar;
- class CMqttVarTable
- {
- public:
- typedef bool (*_PFNCMP)(const char*, const char*);
- public:
- CMqttVarTable(void);
- virtual ~CMqttVarTable(void);
- void AddVar(CMqttVar *pv);
- CMqttVar* Find(const char *key) const;
- inline size_t size(void) const {
- return m_map.size();}
- bool AddToPubTable(CMqttVar *pv);
- bool RemoveFromPubTable(CMqttVar *pv);
- void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks);
- void DumpPubEnabled(uint32_t nPubMask = MQTT_VALUE_ALL_FORMATS);
- void TracePubVar(const char *pszPath, uint32_t nMask, int nQos, bool bRetained);
- private:
- std::map<const char*, CMqttVar*, _PFNCMP> m_map;
- std::map<const char*, CMqttVar*, _PFNCMP> m_pub;
- };
- /////////////////////////////////////////////////////////////////////////////
- class CMqttVar
- {
- public:
- CMqttVar(CMqttVar &&m) noexcept :
- m_hShm(std::move(m.m_hShm)),
- m_pParent(std::move(m.m_pParent)),
- m_nPubMask(std::move(m.m_nPubMask)),
- m_nPubMaskForcedOnce(m_nPubMask),
- m_path(std::move(m.m_path)),
- m_pszPath(m_path.c_str()),
- m_nCbVarpath(std::move(m.m_nCbVarpath)),
- m_name(std::move(m.m_name)),
- m_nIndex(std::move(m.m_nIndex)),
- m_nQos(std::move(m.m_nQos)),
- m_bRetain(std::move(m.m_bRetain)),
- m_nRetainedPubMask(std::move(m.m_nRetainedPubMask))
- {
- }
- CMqttVar(HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
- m_hShm(hShm),
- m_pParent(pParent),
- m_nPubMask(nPubMask),
- m_nPubMaskForcedOnce(m_nPubMask),
- m_pszPath(NULL),
- m_nCbVarpath(0),
- m_name(pszName ? pszName : ""),
- m_nIndex(nIndex),
- m_nQos(nQos),
- m_bRetain(bRetain),
- m_nRetainedPubMask(0)
- {
- }
- virtual ~CMqttVar(void)
- {
- }
- public:
- virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) = 0;
- virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
- return false;
- }
- virtual void CreateMembersTable(CMqttVarTable &vt) {
- vt.AddVar(this);}
- virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt) {
- if((m_nPubMask & nMask) != nMask)
- {
- #if _DUMP_CONTROL_CHANGED
- uint32_t nEnable = ((nMask ^ m_nPubMask) & nMask);
- #endif // _DUMP_CONTROL_CHANGED
- m_nPubMaskForcedOnce |= ((nMask ^ m_nPubMask) & nMask);
- m_nPubMask |= nMask;
- if(pvt)
- {
- pvt->AddToPubTable(this);
- #if _TRACK_TIMES
- g_nDbgCounter1++;
- #endif // _TRACK_TIMES
- #if _DUMP_CONTROL_CHANGED
- TraceFormatChange(nEnable, true);
- #endif // _DUMP_CONTROL_CHANGED
- return true;
- }
- }
- return false;
- }
- virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt) {
- if(m_nPubMask & nMask)
- {
- #if _DUMP_CONTROL_CHANGED
- uint32_t nDisable = (((~nMask) ^ m_nPubMask) & nMask);
- #endif // _DUMP_CONTROL_CHANGED
- m_nPubMask &= ~nMask;
- m_nPubMaskForcedOnce &= ~nMask;
- if(pvt)
- {
- pvt->RemoveFromPubTable(this);
- #if _DUMP_CONTROL_CHANGED
- TraceFormatChange(nDisable, false);
- #endif // _DUMP_CONTROL_CHANGED
- return true;
- }
- }
- return false;
- }
- bool PublishEnabled(uint32_t nMask = MQTT_VALUE_ALL_FORMATS) const {
- return !!(m_nPubMask & nMask);}
- uint32_t GetPublishMask(void) const {
- return m_nPubMask;}
- uint32_t GetForcedPublishMask(void) const {
- return m_nPubMaskForcedOnce;}
- void ClearForcedPublishMask(void) {
- m_nPubMaskForcedOnce = 0;}
- virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1) {
- CreatePath(pParent, pszMemberName, nIndex, m_path);
- m_pszPath = m_path.c_str();
- m_nCbVarpath = m_path.length();
- }
- virtual const char* GetPath(void) const {
- return m_pszPath;}
-
- int GetQoS(void) const {
- return m_nQos;}
- virtual bool SetQoS(int nQos) {
- if(nQos < MQTTCL_MIN_QOS)
- nQos = MQTTCL_MIN_QOS;
- else if(nQos > MQTTCL_MAX_QOS)
- nQos = MQTTCL_MAX_QOS;
- if(m_nQos != nQos) {
- TraceQOSChange(m_nQos, nQos);
- m_nQos = nQos;
- return true;
- }
- return false;
- }
-
- bool GetRetained(void) const {
- return m_bRetain;}
-
- virtual bool SetRetained(bool bRetain) {
- if(m_bRetain != bRetain) {
- m_bRetain = bRetain;
- TraceRetainChange(bRetain);
- return true;
- }
- else
- return false;
- }
- void SetRetainedPubMask(uint32_t nMask) {
- m_nRetainedPubMask |= nMask;
- }
- virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
- {
- if(!m_bRetain)
- {
- std::string s;
- CMqttMessage *pMsg;
- if((m_nRetainedPubMask & MQTT_VALUE_BINLE) || bForce)
- {
- s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
- pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
- rmq.Push(pMsg);
- m_nRetainedPubMask &= ~MQTT_VALUE_BINLE;
- }
- if((m_nRetainedPubMask & MQTT_VALUE_BINBE) || bForce)
- {
- s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
- pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
- rmq.Push(pMsg);
- m_nRetainedPubMask &= ~MQTT_VALUE_BINBE;
- }
- if((m_nRetainedPubMask & MQTT_VALUE_JSON) || bForce)
- {
- s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
- pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
- rmq.Push(pMsg);
- m_nRetainedPubMask &= ~MQTT_VALUE_JSON;
- }
- if((m_nRetainedPubMask & MQTT_VALUE_PBUF) || bForce)
- {
- s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
- pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
- rmq.Push(pMsg);
- m_nRetainedPubMask &= ~MQTT_VALUE_PBUF;
- }
- }
- }
- protected:
- static void CreatePath(CMqttVar *pParent, const char *pszMemberName, int nIndex, std::string &rPath)
- {
- char szIndex[32] = {0};
- if(!pszMemberName || !*pszMemberName)
- pszMemberName = "/";
- if(nIndex >= 0)
- {
- sprintf(szIndex, "/%d", nIndex);
- }
- if(pParent)
- {
- rPath = pParent->GetPath();
- auto len = rPath.size();
- if(len > 0)
- {
- auto rend = rPath.rbegin();
- if(*rend != '/')
- rPath += "/";
- }
- rPath += pszMemberName;
- rPath += szIndex;
- }
- else
- {
- rPath = pszMemberName;
- rPath += szIndex;
- }
- }
- std::string CreateTopic(const char *pszTopicDevID, const char *pszTopicShmID, const char *pszValueFormat, const char *pszTopicCmd)
- {
- return ::formatString("%s/%s/%s/%s%s", pszTopicDevID, pszTopicShmID, pszValueFormat, pszTopicCmd, m_pszPath);
- }
- void Lock(int &nLocks)
- {
- if(!nLocks)
- {
- ::GfaIpcLockSHM(m_hShm);
- ++nLocks;
- }
- }
- void Unlock(int &nLocks)
- {
- if(nLocks)
- {
- --nLocks;
- ::GfaIpcUnlockSHM(m_hShm);
- }
- }
- const char* GetName(void) const {
- return m_name.c_str();}
- int GetIndex(void) const {
- return m_nIndex;}
- private:
- void TraceFormatChange(uint32_t nMask, bool bOn)
- {
- #ifdef _DUMP_CONTROL_CHANGED
- if(m_pszPath && nMask)
- {
- int nCount = 0;
- TRACE("%s ==> %s ", m_pszPath, bOn ? "ON" : "OFF");
- if(nMask & MQTT_VALUE_BINLE)
- {
- TRACE("%s", MQTT_TOPIC_VALUE_BINLE);
- ++nCount;
- }
- if(nMask & MQTT_VALUE_BINBE)
- {
- if(nCount)
- TRACE(", ");
- TRACE("%s", MQTT_TOPIC_VALUE_BINBE);
- ++nCount;
- }
- if(nMask & MQTT_VALUE_JSON)
- {
- if(nCount)
- TRACE(", ");
- TRACE("%s", MQTT_TOPIC_VALUE_JSON);
- ++nCount;
- }
- if(nMask & MQTT_VALUE_PBUF)
- {
- if(nCount)
- TRACE(", ");
- TRACE("%s", MQTT_TOPIC_VALUE_PBUF);
- ++nCount;
- }
- TRACE("\n");
- }
- #endif // _DUMP_CONTROL_CHANGED
- }
- void TraceQOSChange(int nQosOld, int nQosNew)
- {
- #ifdef _DUMP_CONTROL_CHANGED
- if(m_pszPath)
- {
- TRACE("%s ==> QOS: %d -> %d\n", m_pszPath, nQosOld, nQosNew);
- }
- #endif // _DUMP_CONTROL_CHANGED
- }
- void TraceRetainChange(bool bRetained)
- {
- #ifdef _DUMP_CONTROL_CHANGED
- if(m_pszPath)
- {
- TRACE("%s ==> Retain: %s\n", m_pszPath, bRetained ? "true" : "false");
- }
- #endif // _DUMP_CONTROL_CHANGED
- }
- private:
- HSHM m_hShm;
- CMqttVar *m_pParent;
- uint32_t m_nPubMask;
- uint32_t m_nPubMaskForcedOnce;
- std::string m_path;
- const char *m_pszPath;
- size_t m_nCbVarpath;
- std::string m_name;
- int m_nIndex;
- int m_nQos;
- bool m_bRetain;
- uint32_t m_nRetainedPubMask;
- };
- /////////////////////////////////////////////////////////////////////////////
- template <typename T>
- class CMqttVariable : public CMqttVar
- {
- public:
- CMqttVariable(CMqttVariable &&m) noexcept :
- CMqttVar(std::move(m)),
- m_pData(std::move(m.m_pData)),
- m_pShadow(std::move(m.m_pShadow)),
- m_bIsBool(std::move(m.m_bIsBool))
- {
- }
- CMqttVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
- CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
- m_pData((volatile T*)pData),
- m_pShadow((T*)pShadow),
- m_bIsBool(std::type_index(typeid(T)) == std::type_index(typeid(bool)))
- {
- ASSERT(m_pData);
- ASSERT(m_pShadow);
- ASSERT(hShm);
- int nLocks = 0;
- Lock(nLocks);
- *m_pShadow = *m_pData;
- Unlock(nLocks);
- }
- /////////////////////////////////////////////////////////////////////////
- public:
- virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
- ClearForcedPublishMask();
- if(!PublishEnabled())
- return;
- if(!UpdateShadowBuffer(nLocks))
- {
- if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- else
- {
- if(PublishEnabled(MQTT_VALUE_BINLE))
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_BINBE))
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_JSON))
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_PBUF))
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- }
- virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
- switch(nValFormat)
- {
- case MQTT_VALUE_BINLE:
- return SetShmValueLE(pMsg, nLocks);
- case MQTT_VALUE_BINBE:
- return SetShmValueBE(pMsg, nLocks);
- case MQTT_VALUE_JSON:
- return SetShmValueJson(pMsg, nLocks);
- case MQTT_VALUE_PBUF:
- return SetShmValuePBuf(pMsg, nLocks);
- default:
- break;
- }
- return false;
- }
- /////////////////////////////////////////////////////////////////////////
- private:
- bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
- {
- T val = *(T*)pMsg->payload;
- #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
- _swap_val(val);
- #endif // __BYTE_ORDER__
- Lock(nLocks);
- *m_pData = val;
- Unlock(nLocks);
- return true;
- }
- return false;
- }
- bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
- {
- T val = *(T*)pMsg->payload;
- #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
- _swap_val(val);
- #endif // __BYTE_ORDER__
- Lock(nLocks);
- *m_pData = val;
- Unlock(nLocks);
- return true;
- }
- return false;
- }
- bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
- {
- bool bRet = false;
- if(pMsg)
- {
- T val;
- CJson_t jtRoot, jtVal;
- std::string sErr;
- if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
- {
- if(jtRoot.GetValue("value", jtVal))
- {
- const json_t *pjt = jtVal;
- if(m_bIsBool)
- {
- if(json_is_boolean(pjt))
- {
- val = (T)json_boolean_value(pjt);
- bRet = true;
- }
- else if(json_is_integer(pjt))
- {
- val = !!::json_integer_value(pjt);
- bRet = true;
- }
- }
- else if(std::is_integral<T>::value)
- {
- if(json_is_integer(pjt))
- {
- val = (T)::json_integer_value(pjt);
- bRet = true;
- }
- else if(json_is_boolean(pjt))
- {
- val = (T)(json_boolean_value(pjt) ? 1 : 0);
- bRet = true;
- }
- }
- else if(std::is_floating_point<T>::value)
- {
- if(json_is_real(pjt))
- {
- val = (T)::json_real_value(pjt);
- bRet = true;
- }
- else if(json_is_integer(pjt))
- {
- val = (T)::json_integer_value(pjt);
- bRet = true;
- }
- else if(json_is_boolean(pjt))
- {
- val = (T)(json_boolean_value(pjt) ? 1 : 0);
- bRet = true;
- }
- }
-
- if(bRet)
- {
- Lock(nLocks);
- *m_pData = val;
- Unlock(nLocks);
- }
- }
- }
- }
- return bRet;
- }
- bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
- {
- TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
- return false;
- }
- /////////////////////////////////////////////////////////////////////////
- void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
- #if _RETURN_BIN_AS_STRING
- std::string v = toString(*m_pShadow);
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- #else // _RETURN_BIN_AS_STRING
- T val = *m_pShadow;
- #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
- _swap_val(val);
- #endif // __BYTE_ORDER__
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
- #endif // _RETURN_BIN_AS_STRING
- rmq.Push(pMsg);
- if(GetRetained())
- SetRetainedPubMask(MQTT_VALUE_BINLE);
- }
- void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
- #if _RETURN_BIN_AS_STRING
- std::string v = toString(*m_pShadow);
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- #else // _RETURN_BIN_AS_STRING
- T val = *m_pShadow;
- #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
- _swap_val(val);
- #endif // __BYTE_ORDER__
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
- #endif // _RETURN_BIN_AS_STRING
- rmq.Push(pMsg);
- if(GetRetained())
- SetRetainedPubMask(MQTT_VALUE_BINBE);
- }
- void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
- CJson_t jtValue(json_object());
- CJson_t jtPath(json_string(GetPath()));
- CJson_t jtIndex(json_integer(GetIndex()));
- CJson_t jtName(json_string(GetName()));
- CJson_t jtVal(GetJsonValue());
- json_object_set(jtValue, "path", jtPath);
- json_object_set(jtValue, "index", jtIndex);
- json_object_set(jtValue, "name", jtName);
- json_object_set(jtValue, "value", jtVal);
- char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
- std::string v = pszJson;
- free(pszJson);
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- rmq.Push(pMsg);
- if(GetRetained())
- SetRetainedPubMask(MQTT_VALUE_JSON);
- }
- void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
- TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
- }
- std::string toString(T v)
- {
- if(std::is_integral<T>::value)
- {
- if(std::is_signed<T>::value)
- {
- long long r = (long long)v;
- return formatString("%lld", r);
- }
- else if(std::is_unsigned<T>::value)
- {
- unsigned long long r = (unsigned long long)v;
- return formatString("%llu", r);
- }
- }
- else if(std::is_floating_point<T>::value)
- {
- double r = (double)v;
- return formatString("%.20g", r);
- }
- return "";
- }
- json_t* GetJsonValue(void) const
- {
- if(std::is_integral<T>::value)
- return m_bIsBool ? json_boolean(!!*m_pShadow) : ::json_integer((json_int_t)*m_pShadow);
- else if(std::is_floating_point<T>::value)
- return ::json_real((double)*m_pShadow);
- ASSERT(false);
- return NULL;
- }
- bool UpdateShadowBuffer(int &nLocks)
- {
- Lock(nLocks);
- bool bChanged = *m_pShadow != *m_pData;
- if(bChanged)
- {
- *m_pShadow = *m_pData;
- TRACE("Changed: %s\n", GetPath());
- }
- Unlock(nLocks);
- return bChanged;
- }
- private:
- volatile T *m_pData;
- T *m_pShadow;
- bool m_bIsBool;
- };
- /////////////////////////////////////////////////////////////////////////////
- template <typename T, int ST>
- class CMqttStringVariable : public CMqttVar
- {
- public:
- typedef enum
- {
- VT_Invalid, // 0
- VT_Latin1, // 1
- VT_UTF_8, // 2
- VT_UTF_16, // 3
- VT_UTF_32, // 4
- VT_Unicode, // 5
- VT_Last
- }VT;
- public:
- CMqttStringVariable(CMqttStringVariable &&m) noexcept :
- CMqttVar(std::move(m)),
- m_nCChBuffer(std::move(m.m_nCChBuffer)),
- m_nCbBuffer(std::move(m.m_nCbBuffer)),
- m_nCbString(std::move(m.m_nCbString)),
- m_pszData(std::move(m.m_pszData)),
- m_pszShadow(std::move(m.m_pszShadow))
- {
- }
- CMqttStringVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nCChBuffer, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
- CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
- m_nCChBuffer(nCChBuffer),
- m_nCbBuffer(nCChBuffer * sizeof(T)),
- m_nCbString(0),
- m_pszData((T*)pData),
- m_pszShadow((T*)pShadow)
- {
- ASSERT(m_pszData);
- ASSERT(m_pszShadow);
- ASSERT(m_nCChBuffer > 0);
- int nLocks = 0;
- Lock(nLocks);
- zeroTerm(m_nCChBuffer - 1);
- m_nCbString = slen(m_pszData) * sizeof(T);
- memcpy(m_pszShadow, (const void*)m_pszData, m_nCbBuffer);
- Unlock(nLocks);
- }
- public:
- virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
- ClearForcedPublishMask();
- if(!PublishEnabled())
- return;
- if(!UpdateShadowBuffer(nLocks))
- {
- if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- else
- {
- if(PublishEnabled(MQTT_VALUE_BINLE))
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_BINBE))
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_JSON))
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_PBUF))
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- }
- virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
- switch(nValFormat)
- {
- case MQTT_VALUE_BINLE:
- return SetShmValueLE(pMsg, nLocks);
- case MQTT_VALUE_BINBE:
- return SetShmValueBE(pMsg, nLocks);
- case MQTT_VALUE_JSON:
- return SetShmValueJson(pMsg, nLocks);
- case MQTT_VALUE_PBUF:
- return SetShmValuePBuf(pMsg, nLocks);
- default:
- break;
- }
- return false;
- }
- private:
- bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg->payload && pMsg->payloadlen > 0)
- {
- if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
- {
- Lock(nLocks);
- m_nCbString = (size_t)pMsg->payloadlen;
- #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
- _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
- #else // __BYTE_ORDER__
- memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
- #endif // __BYTE_ORDER__
- zeroTerm(m_nCbString / sizeof(T));
- Unlock(nLocks);
- return true;
- }
- }
- else
- {
- Lock(nLocks);
- *m_pszData = '\0';
- m_nCbString = 0;
- Unlock(nLocks);
- return true;
- }
- return false;
- }
- bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg->payload && pMsg->payloadlen > 0)
- {
- if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
- {
- Lock(nLocks);
- m_nCbString = (size_t)pMsg->payloadlen;
- #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
- _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
- #else // __BYTE_ORDER__
- memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
- #endif // __BYTE_ORDER__
- zeroTerm(m_nCbString / sizeof(T));
- Unlock(nLocks);
- return true;
- }
- }
- else
- {
- Lock(nLocks);
- *m_pszData = '\0';
- m_nCbString = 0;
- Unlock(nLocks);
- return true;
- }
- return false;
- }
- bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg)
- {
- CJson_t jtRoot, jtVal;
- std::string sErr;
- if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
- {
- if(jtRoot.GetValue("value", jtVal))
- {
- const json_t *pjt = jtVal;
- if(json_is_string(pjt))
- {
- std::string s(::json_string_value(jtVal));
- if(s.length() > 0)
- {
- Lock(nLocks);
- if(fromUTF8(s.c_str(), s.length(), (T*)m_pszData, m_nCbBuffer))
- m_nCbString = slen(m_pszData) * sizeof(T);
- Unlock(nLocks);
- }
- else
- {
- Lock(nLocks);
- *m_pszData = '\0';
- m_nCbString = 0;
- Unlock(nLocks);
- }
- return true;
- }
- }
- }
- }
- return false;
- }
- bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
- {
- TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
- return false;
- }
- void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
- #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
- T szBuf[m_nCChBuffer];
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
- #else // __BYTE_ORDER__
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
- #endif // __BYTE_ORDER__
- rmq.Push(pMsg);
- }
- void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
- #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
- T szBuf[m_nCChBuffer];
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
- #else // __BYTE_ORDER__
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
- #endif // __BYTE_ORDER__
- rmq.Push(pMsg);
- }
- void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
- CJson_t jtValue(json_object());
- CJson_t jtPath(json_string(GetPath()));
- CJson_t jtIndex(json_integer(GetIndex()));
- CJson_t jtName(json_string(GetName()));
- CJson_t jtVal(GetJsonValue());
- json_object_set(jtValue, "path", jtPath);
- json_object_set(jtValue, "index", jtIndex);
- json_object_set(jtValue, "name", jtName);
- json_object_set(jtValue, "value", jtVal);
- char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
- std::string v = pszJson;
- free(pszJson);
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- rmq.Push(pMsg);
- }
- void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
- TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
- }
- json_t* GetJsonValue(void) const
- {
- char szBuf[m_nCbBuffer * 4];
- const char *pszUtf8 = toUTF8(m_pszShadow, m_nCbString, szBuf, sizeof(szBuf));
- if(pszUtf8)
- return json_string(pszUtf8);
- return NULL;
- }
- const char* toUTF8(const T *pszIn, size_t nCbIn, char *pszOut, size_t nCbOut) const
- {
- switch(ST)
- {
- case VT_Latin1:
- Latin1ToUtf8(pszIn, nCbIn, pszOut, nCbOut);
- break;
- case VT_UTF_8:
- strncpy(pszOut, (const char*)pszIn, nCbOut - 1);
- pszOut[nCbOut - 1] = '\0';
- break;
- case VT_UTF_16:
- EncToUtf8(_UTF_16, pszIn, nCbIn, pszOut, nCbOut);
- break;
- case VT_UTF_32:
- EncToUtf8(_UTF_32, pszIn, nCbIn, pszOut, nCbOut);
- break;
- case VT_Unicode:
- EncToUtf8(_UNICODE, pszIn, nCbIn, pszOut, nCbOut);
- break;
- default:
- ASSERT(false);
- return NULL;
- }
-
- return pszOut;
- }
- const T* fromUTF8(const char *pszIn, size_t nCbIn, T *pszOut, size_t nCbOut) const
- {
- size_t nRet;
- switch(ST)
- {
- case VT_Latin1:
- nRet = Utf8ToLatin1(pszIn, nCbIn, pszOut, nCbOut);
- pszOut[nRet / sizeof(T)] = T('\0');
- break;
- case VT_UTF_8:
- strncpy((char*)pszOut, pszIn, nCbOut - 1);
- pszOut[nCbOut - 1] = T('\0');
- break;
- case VT_UTF_16:
- nRet = Utf8ToUtf16(pszIn, nCbIn, (char16_t*)pszOut, nCbOut);
- pszOut[nRet / sizeof(T)] = T('\0');
- break;
- case VT_UTF_32:
- nRet = Utf8ToUtf32(pszIn, nCbIn, (char32_t*)pszOut, nCbOut);
- pszOut[nRet / sizeof(T)] = T('\0');
- break;
- case VT_Unicode:
- nRet = Utf8ToWcs(pszIn, nCbIn, (wchar_t*)pszOut, nCbOut);
- pszOut[nRet / sizeof(T)] = T('\0');
- break;
- default:
- ASSERT(false);
- return NULL;
- }
-
- return pszOut;
- }
- size_t slen(volatile const T *s)
- {
- volatile const T *p = s;
- while(*p) ++p;
- return p - s;
- }
- void zeroTerm(size_t at)
- {
- m_pszData[at] = (T)'\0';
- }
- bool UpdateShadowBuffer(int &nLocks)
- {
- Lock(nLocks);
- bool bChanged = !!memcmp(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
- if(bChanged)
- {
- zeroTerm(m_nCChBuffer - 1);
- m_nCbString = slen(m_pszData) * sizeof(T);
- memcpy(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
- TRACE("Changed: %s\n", GetPath());
- }
- Unlock(nLocks);
- return bChanged;
- }
- private:
- const size_t m_nCChBuffer;
- const size_t m_nCbBuffer;
- size_t m_nCbString;
- volatile T *m_pszData;
- T *m_pszShadow;
- };
- /////////////////////////////////////////////////////////////////////////////
- #define GET_BOOL_VAL(p, m) (!!(*p & m))
- #define SET_BIT(p, m) (*p |= m)
- #define CLR_BIT(p, m) (*p &= ~m)
- #define STORE_BIT(p, m, b) (b) ? SET_BIT(p, m) : CLR_BIT(p, m)
- class CMqttBitVariable : public CMqttVar
- {
- public:
- CMqttBitVariable(CMqttBitVariable &&m) noexcept :
- CMqttVar(std::move(m)),
- m_mask(std::move(m.m_mask)),
- m_pDataByte(std::move(m.m_pDataByte)),
- m_pShadowByte(std::move(m.m_pShadowByte))
- {
- }
- CMqttBitVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nOffset, int nBitNr, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
- CMqttVar(hShm, pszName, -1, nPubMask, nQos, bRetain, pParent),
- m_mask(1 << nBitNr),
- m_pDataByte((uint8_t*)pData + nOffset),
- m_pShadowByte((uint8_t*)pShadow + nOffset)
- {
- ASSERT(m_pDataByte);
- ASSERT(m_pShadowByte);
- int nLocks = 0;
- Lock(nLocks);
- STORE_BIT(m_pShadowByte, m_mask, GET_BOOL_VAL(m_pDataByte, m_mask));
- Unlock(nLocks);
- }
- public:
- virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
- ClearForcedPublishMask();
- if(!PublishEnabled())
- return;
- if(!UpdateShadowBuffer(nLocks))
- {
- if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- else
- {
- if(PublishEnabled(MQTT_VALUE_BINLE))
- PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_BINBE))
- PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_JSON))
- PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- if(PublishEnabled(MQTT_VALUE_PBUF))
- PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- }
- virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
- switch(nValFormat)
- {
- case MQTT_VALUE_BINLE:
- case MQTT_VALUE_BINBE:
- return SetShmValueBin(pMsg, nLocks);
- case MQTT_VALUE_JSON:
- return SetShmValueJson(pMsg, nLocks);
- case MQTT_VALUE_PBUF:
- return SetShmValuePBuf(pMsg, nLocks);
- default:
- break;
- }
- return false;
- }
- private:
- bool SetShmValueBin(CMqttMessage *pMsg, int &nLocks)
- {
- if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(bool)))
- {
- bool bVal = !!(*(bool*)pMsg->payload);
- Lock(nLocks);
- STORE_BIT(m_pDataByte, m_mask, bVal);
- Unlock(nLocks);
- return true;
- }
- return false;
- }
- bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
- {
- bool bRet = false;
- if(pMsg)
- {
- bool bVal;
- CJson_t jtRoot, jtVal;
- std::string sErr;
- if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
- {
- if(jtRoot.GetValue("value", jtVal))
- {
- const json_t *pjt = jtVal;
- if(json_is_boolean(pjt))
- {
- bVal = json_boolean_value(pjt);
- bRet = true;
- }
- else if(json_is_integer(pjt))
- {
- bVal = !!::json_integer_value(pjt);
- bRet = true;
- }
-
- if(bRet)
- {
- Lock(nLocks);
- STORE_BIT(m_pDataByte, m_mask, bVal);
- Unlock(nLocks);
- }
- }
- }
- }
- return bRet;
- }
- bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
- {
- TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
- return false;
- }
- void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
- bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
- #if _RETURN_BIN_AS_STRING
- std::string v = val ? "true" : "false";
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- #else // _RETURN_BIN_AS_STRING
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
- #endif // _RETURN_BIN_AS_STRING
- rmq.Push(pMsg);
- }
- void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
- bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
- #if _RETURN_BIN_AS_STRING
- std::string v = val ? "true" : "false";
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- #else // _RETURN_BIN_AS_STRING
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
- #endif // _RETURN_BIN_AS_STRING
- rmq.Push(pMsg);
- }
- void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
- CJson_t jtValue(json_object());
- CJson_t jtPath(json_string(GetPath()));
- CJson_t jtIndex(json_integer(GetIndex()));
- CJson_t jtName(json_string(GetName()));
- CJson_t jtVal(GetJsonValue());
- json_object_set(jtValue, "path", jtPath);
- json_object_set(jtValue, "index", jtIndex);
- json_object_set(jtValue, "name", jtName);
- json_object_set(jtValue, "value", jtVal);
- char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
- std::string v = pszJson;
- free(pszJson);
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
- rmq.Push(pMsg);
- }
- void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
- TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
- }
- json_t* GetJsonValue(void) const
- {
- return json_boolean(GET_BOOL_VAL(m_pShadowByte, m_mask));
- }
- bool UpdateShadowBuffer(int &nLocks)
- {
- bool bVal;
- Lock(nLocks);
- bool bChanged = GET_BOOL_VAL(m_pShadowByte, m_mask) != (bVal = GET_BOOL_VAL(m_pDataByte, m_mask));
- if(bChanged)
- {
- STORE_BIT(m_pShadowByte, m_mask, bVal);
- TRACE("Changed: %s\n", GetPath());
- }
- Unlock(nLocks);
- return bChanged;
- }
- private:
- uint8_t m_mask;
- volatile uint8_t *m_pDataByte;
- uint8_t *m_pShadowByte;
- };
- /////////////////////////////////////////////////////////////////////////////
- template <typename T, typename V>
- class CMqttVarArray : public CMqttVar,
- public std::vector<T>
- {
- public:
- CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
- : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
- {
- for(size_t i = 0; i < nElemCount; i++)
- {
- this->emplace_back(&((V*)pData)[i], &((V*)pShadow)[i], hShm, pszName, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
- }
- }
- CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, size_t nCChData, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
- : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
- {
- for(size_t i = 0; i < nElemCount; i++)
- {
- this->emplace_back(&((V*)pData)[i * nCChData], &((V*)pShadow)[i * nCChData], hShm, pszName, nCChData, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
- }
- }
- public:
- virtual void CreateMembersTable(CMqttVarTable &vt)
- {
- CMqttVar::CreateMembersTable(vt);
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- rVar.CreateMembersTable(vt);
- }
- }
- virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1)
- {
- CMqttVar::InitPath(pParent, pszMemberName, nIndex);
- int j = 0;
- for(auto i = this->begin(); i != this->end(); ++i, ++j)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- rVar.InitPath(pParent, pszMemberName, j);
- }
- }
- virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
- {
- Lock(nLocks);
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- rVar.CheckShmAndPublish(pszTopicDevID, pszTopicShmID, rmq, nLocks);
- }
- Unlock(nLocks);
- }
- virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt)
- {
- bool bRet = false;
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- bRet = rVar.EnablePublish(nMask, pvt) || bRet;
- }
- return bRet;
- }
- virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt)
- {
- bool bRet = false;
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- bRet = rVar.DisablePublish(nMask, pvt) || bRet;
- }
- return bRet;
- }
- virtual bool SetQoS(int nQos)
- {
- bool bRet = false;
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- bRet = rVar.SetQoS(nQos) || bRet;
- }
- return bRet;
- }
- virtual bool SetRetained(bool bRetain)
- {
- bool bRet = false;
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- bRet = rVar.SetRetained(bRetain) || bRet;
- }
- return bRet;
- }
- virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
- {
- for(auto i = this->begin(); i != this->end(); ++i)
- {
- CMqttVar &rVar = static_cast<CMqttVar&>(*i);
- rVar.RemoveRetained(pszTopicDevID, pszTopicShmID, rmq, bForce);
- }
- }
- };
- /////////////////////////////////////////////////////////////////////////////
- #endif // __cplusplus
- #endif // !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)
|