// mqttvar.h : // #if !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_) #define AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_ #include #include #include #include #include #include #include #include #include #include #ifndef _LIBBUILD #include #include #include #include #include #include #else // _LIBBUILD #include "common/strutil.h" #include "common/conv.h" #include "common/debug.h" #include "mqttmsg.h" #include "mqttjson.h" #include "mqttdbg.h" #endif // _LIBBUILD ///////////////////////////////////////////////////////////////////////////// #define MQTT_VALUE_BINLE 0x00000001 #define MQTT_VALUE_BINBE 0x00000002 #define MQTT_VALUE_JSON 0x00000004 #define MQTT_VALUE_PBUF 0x00000008 #define MQTT_VALUE_ALL_FORMATS (MQTT_VALUE_BINLE | MQTT_VALUE_BINBE | MQTT_VALUE_JSON | MQTT_VALUE_PBUF) #define MQTT_TOPIC_CMD_VALUE "VALUE" #define MQTT_TOPIC_VALUE_BINLE "BINLE" #define MQTT_TOPIC_VALUE_BINBE "BINBE" #define MQTT_TOPIC_VALUE_JSON "JSON" #define MQTT_TOPIC_VALUE_PBUF "PBUF" #ifdef _DEBUG #define MQTT_JSON_OUTPUT_FLAGS JSON_MAX_INDENT #else // _DEBUG #define MQTT_JSON_OUTPUT_FLAGS JSON_COMPACT #endif // _DEBUG #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ #define _UTF_16 "UTF-16LE//" #define _UTF_32 "UTF-32LE//" #define _UNICODE "WCHAR_T//" #elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ #define _UTF_16 "UTF-16BE//" #define _UTF_32 "UTF-32BE//" #define _UNICODE "WCHAR_T//" #else // __BYTE_ORDER__ #error Invalid or unsupported byte order! #endif // __BYTE_ORDER__ #define MQTTCL_MIN_QOS 0 #define MQTTCL_MAX_QOS 2 #ifdef __cplusplus ///////////////////////////////////////////////////////////////////////////// template static void _swap_val(T &v) { // should static assert that T is a POD if(sizeof(T) > 1) { char &raw = reinterpret_cast(v); std::reverse(&raw, &raw + sizeof(T)); } } template static const T* _copy_swap_string_chars(const T *s, T *v, size_t cch) { if(sizeof(T) > 1 && cch > 0) { for(T *pv = v; cch > 0; --cch, ++pv, ++s) { *pv = *s; if(*pv) _swap_val(*pv); else break; } return v; } else { memcpy(v, s, cch); return v; } } ///////////////////////////////////////////////////////////////////////////// // mqttvar.h - Declarations: class CMqttVar; class CMqttVarTable { public: typedef bool (*_PFNCMP)(const char*, const char*); public: CMqttVarTable(void); virtual ~CMqttVarTable(void); void AddVar(CMqttVar *pv); CMqttVar* Find(const char *key) const; inline size_t size(void) const { return m_map.size();} bool AddToPubTable(CMqttVar *pv); bool RemoveFromPubTable(CMqttVar *pv); void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks); void DumpPubEnabled(uint32_t nPubMask = MQTT_VALUE_ALL_FORMATS); void TracePubVar(const char *pszPath, uint32_t nMask, int nQos, bool bRetained); private: std::map m_map; std::map m_pub; }; ///////////////////////////////////////////////////////////////////////////// class CMqttVar { public: CMqttVar(CMqttVar &&m) noexcept : m_hShm(std::move(m.m_hShm)), m_pParent(std::move(m.m_pParent)), m_nPubMask(std::move(m.m_nPubMask)), m_nPubMaskForcedOnce(m_nPubMask), m_path(std::move(m.m_path)), m_pszPath(m_path.c_str()), m_nCbVarpath(std::move(m.m_nCbVarpath)), m_name(std::move(m.m_name)), m_nIndex(std::move(m.m_nIndex)), m_nQos(std::move(m.m_nQos)), m_bRetain(std::move(m.m_bRetain)), m_nRetainedPubMask(std::move(m.m_nRetainedPubMask)) { } CMqttVar(HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : m_hShm(hShm), m_pParent(pParent), m_nPubMask(nPubMask), m_nPubMaskForcedOnce(m_nPubMask), m_pszPath(NULL), m_nCbVarpath(0), m_name(pszName ? pszName : ""), m_nIndex(nIndex), m_nQos(nQos), m_bRetain(bRetain), m_nRetainedPubMask(0) { } virtual ~CMqttVar(void) { } public: virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) = 0; virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) { return false; } virtual void CreateMembersTable(CMqttVarTable &vt) { vt.AddVar(this);} virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt) { if((m_nPubMask & nMask) != nMask) { #if _DUMP_CONTROL_CHANGED uint32_t nEnable = ((nMask ^ m_nPubMask) & nMask); #endif // _DUMP_CONTROL_CHANGED m_nPubMaskForcedOnce |= ((nMask ^ m_nPubMask) & nMask); m_nPubMask |= nMask; if(pvt) { pvt->AddToPubTable(this); #if _TRACK_TIMES g_nDbgCounter1++; #endif // _TRACK_TIMES #if _DUMP_CONTROL_CHANGED TraceFormatChange(nEnable, true); #endif // _DUMP_CONTROL_CHANGED return true; } } return false; } virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt) { if(m_nPubMask & nMask) { #if _DUMP_CONTROL_CHANGED uint32_t nDisable = (((~nMask) ^ m_nPubMask) & nMask); #endif // _DUMP_CONTROL_CHANGED m_nPubMask &= ~nMask; m_nPubMaskForcedOnce &= ~nMask; if(pvt) { pvt->RemoveFromPubTable(this); #if _DUMP_CONTROL_CHANGED TraceFormatChange(nDisable, false); #endif // _DUMP_CONTROL_CHANGED return true; } } return false; } bool PublishEnabled(uint32_t nMask = MQTT_VALUE_ALL_FORMATS) const { return !!(m_nPubMask & nMask);} uint32_t GetPublishMask(void) const { return m_nPubMask;} uint32_t GetForcedPublishMask(void) const { return m_nPubMaskForcedOnce;} void ClearForcedPublishMask(void) { m_nPubMaskForcedOnce = 0;} virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1) { CreatePath(pParent, pszMemberName, nIndex, m_path); m_pszPath = m_path.c_str(); m_nCbVarpath = m_path.length(); } virtual const char* GetPath(void) const { return m_pszPath;} int GetQoS(void) const { return m_nQos;} virtual bool SetQoS(int nQos) { if(nQos < MQTTCL_MIN_QOS) nQos = MQTTCL_MIN_QOS; else if(nQos > MQTTCL_MAX_QOS) nQos = MQTTCL_MAX_QOS; if(m_nQos != nQos) { TraceQOSChange(m_nQos, nQos); m_nQos = nQos; return true; } return false; } bool GetRetained(void) const { return m_bRetain;} virtual bool SetRetained(bool bRetain) { if(m_bRetain != bRetain) { m_bRetain = bRetain; TraceRetainChange(bRetain); return true; } else return false; } void SetRetainedPubMask(uint32_t nMask) { m_nRetainedPubMask |= nMask; } virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce) { if(!m_bRetain) { std::string s; CMqttMessage *pMsg; if((m_nRetainedPubMask & MQTT_VALUE_BINLE) || bForce) { s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE); pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS()); rmq.Push(pMsg); m_nRetainedPubMask &= ~MQTT_VALUE_BINLE; } if((m_nRetainedPubMask & MQTT_VALUE_BINBE) || bForce) { s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE); pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS()); rmq.Push(pMsg); m_nRetainedPubMask &= ~MQTT_VALUE_BINBE; } if((m_nRetainedPubMask & MQTT_VALUE_JSON) || bForce) { s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE); pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS()); rmq.Push(pMsg); m_nRetainedPubMask &= ~MQTT_VALUE_JSON; } if((m_nRetainedPubMask & MQTT_VALUE_PBUF) || bForce) { s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE); pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS()); rmq.Push(pMsg); m_nRetainedPubMask &= ~MQTT_VALUE_PBUF; } } } protected: static void CreatePath(CMqttVar *pParent, const char *pszMemberName, int nIndex, std::string &rPath) { char szIndex[32] = {0}; if(!pszMemberName || !*pszMemberName) pszMemberName = "/"; if(nIndex >= 0) { sprintf(szIndex, "/%d", nIndex); } if(pParent) { rPath = pParent->GetPath(); auto len = rPath.size(); if(len > 0) { auto rend = rPath.rbegin(); if(*rend != '/') rPath += "/"; } rPath += pszMemberName; rPath += szIndex; } else { rPath = pszMemberName; rPath += szIndex; } } std::string CreateTopic(const char *pszTopicDevID, const char *pszTopicShmID, const char *pszValueFormat, const char *pszTopicCmd) { if((!pszTopicDevID || !*pszTopicDevID) && (!pszTopicShmID || !*pszTopicShmID)) return ::formatString("%s/%s%s", pszValueFormat, pszTopicCmd, m_pszPath); else if(pszTopicDevID && *pszTopicDevID && pszTopicShmID && *pszTopicShmID) return ::formatString("%s/%s/%s/%s%s", pszTopicDevID, pszTopicShmID, pszValueFormat, pszTopicCmd, m_pszPath); else { const char *pszTopic = (pszTopicDevID && *pszTopicDevID) ? pszTopicDevID : pszTopicShmID; return ::formatString("%s/%s/%s%s", pszTopic, pszValueFormat, pszTopicCmd, m_pszPath); } } void Lock(int &nLocks) { if(!nLocks) { ::GfaIpcLockSHM(m_hShm); ++nLocks; } } void Unlock(int &nLocks) { if(nLocks) { --nLocks; ::GfaIpcUnlockSHM(m_hShm); } } const char* GetName(void) const { return m_name.c_str();} int GetIndex(void) const { return m_nIndex;} private: void TraceFormatChange(uint32_t nMask, bool bOn) { #ifdef _DUMP_CONTROL_CHANGED if(m_pszPath && nMask) { int nCount = 0; TRACE("%s ==> %s ", m_pszPath, bOn ? "ON" : "OFF"); if(nMask & MQTT_VALUE_BINLE) { TRACE("%s", MQTT_TOPIC_VALUE_BINLE); ++nCount; } if(nMask & MQTT_VALUE_BINBE) { if(nCount) TRACE(", "); TRACE("%s", MQTT_TOPIC_VALUE_BINBE); ++nCount; } if(nMask & MQTT_VALUE_JSON) { if(nCount) TRACE(", "); TRACE("%s", MQTT_TOPIC_VALUE_JSON); ++nCount; } if(nMask & MQTT_VALUE_PBUF) { if(nCount) TRACE(", "); TRACE("%s", MQTT_TOPIC_VALUE_PBUF); ++nCount; } TRACE("\n"); } #endif // _DUMP_CONTROL_CHANGED } void TraceQOSChange(int nQosOld, int nQosNew) { #ifdef _DUMP_CONTROL_CHANGED if(m_pszPath) { TRACE("%s ==> QOS: %d -> %d\n", m_pszPath, nQosOld, nQosNew); } #endif // _DUMP_CONTROL_CHANGED } void TraceRetainChange(bool bRetained) { #ifdef _DUMP_CONTROL_CHANGED if(m_pszPath) { TRACE("%s ==> Retain: %s\n", m_pszPath, bRetained ? "true" : "false"); } #endif // _DUMP_CONTROL_CHANGED } private: HSHM m_hShm; CMqttVar *m_pParent; uint32_t m_nPubMask; uint32_t m_nPubMaskForcedOnce; std::string m_path; const char *m_pszPath; size_t m_nCbVarpath; std::string m_name; int m_nIndex; int m_nQos; bool m_bRetain; uint32_t m_nRetainedPubMask; }; ///////////////////////////////////////////////////////////////////////////// template class CMqttVariable : public CMqttVar { public: CMqttVariable(CMqttVariable &&m) noexcept : CMqttVar(std::move(m)), m_pData(std::move(m.m_pData)), m_pShadow(std::move(m.m_pShadow)), m_bIsBool(std::move(m.m_bIsBool)) { } CMqttVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent), m_pData((volatile T*)pData), m_pShadow((T*)pShadow), m_bIsBool(std::type_index(typeid(T)) == std::type_index(typeid(bool))) { ASSERT(m_pData); ASSERT(m_pShadow); ASSERT(hShm); int nLocks = 0; Lock(nLocks); *m_pShadow = *m_pData; Unlock(nLocks); } ///////////////////////////////////////////////////////////////////////// public: virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { uint32_t nPubMaskForcedOnce = GetForcedPublishMask(); ClearForcedPublishMask(); if(!PublishEnabled()) return; if(!UpdateShadowBuffer(nLocks)) { if(nPubMaskForcedOnce & MQTT_VALUE_BINLE) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_BINBE) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_JSON) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_PBUF) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } else { if(PublishEnabled(MQTT_VALUE_BINLE)) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_BINBE)) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_JSON)) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_PBUF)) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } } virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) { switch(nValFormat) { case MQTT_VALUE_BINLE: return SetShmValueLE(pMsg, nLocks); case MQTT_VALUE_BINBE: return SetShmValueBE(pMsg, nLocks); case MQTT_VALUE_JSON: return SetShmValueJson(pMsg, nLocks); case MQTT_VALUE_PBUF: return SetShmValuePBuf(pMsg, nLocks); default: break; } return false; } ///////////////////////////////////////////////////////////////////////// private: bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks) { if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T))) { T val = *(T*)pMsg->payload; #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ _swap_val(val); #endif // __BYTE_ORDER__ Lock(nLocks); *m_pData = val; Unlock(nLocks); return true; } return false; } bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks) { if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T))) { T val = *(T*)pMsg->payload; #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ _swap_val(val); #endif // __BYTE_ORDER__ Lock(nLocks); *m_pData = val; Unlock(nLocks); return true; } return false; } bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks) { bool bRet = false; if(pMsg) { T val; CJson_t jtRoot, jtVal; std::string sErr; if(pMsg->GetPayloadAsJSON(jtRoot, sErr)) { if(jtRoot.GetValue("value", jtVal)) { const json_t *pjt = jtVal; if(m_bIsBool) { if(json_is_boolean(pjt)) { val = (T)json_boolean_value(pjt); bRet = true; } else if(json_is_integer(pjt)) { val = !!::json_integer_value(pjt); bRet = true; } } else if(std::is_integral::value) { if(json_is_integer(pjt)) { val = (T)::json_integer_value(pjt); bRet = true; } else if(json_is_boolean(pjt)) { val = (T)(json_boolean_value(pjt) ? 1 : 0); bRet = true; } } else if(std::is_floating_point::value) { if(json_is_real(pjt)) { val = (T)::json_real_value(pjt); bRet = true; } else if(json_is_integer(pjt)) { val = (T)::json_integer_value(pjt); bRet = true; } else if(json_is_boolean(pjt)) { val = (T)(json_boolean_value(pjt) ? 1 : 0); bRet = true; } } if(bRet) { Lock(nLocks); *m_pData = val; Unlock(nLocks); } } } } return bRet; } bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks) { TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath()); return false; } ///////////////////////////////////////////////////////////////////////// void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE); #if _RETURN_BIN_AS_STRING std::string v = toString(*m_pShadow); CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); #else // _RETURN_BIN_AS_STRING T val = *m_pShadow; #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ _swap_val(val); #endif // __BYTE_ORDER__ CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained()); #endif // _RETURN_BIN_AS_STRING rmq.Push(pMsg); if(GetRetained()) SetRetainedPubMask(MQTT_VALUE_BINLE); } void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE); #if _RETURN_BIN_AS_STRING std::string v = toString(*m_pShadow); CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); #else // _RETURN_BIN_AS_STRING T val = *m_pShadow; #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ _swap_val(val); #endif // __BYTE_ORDER__ CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained()); #endif // _RETURN_BIN_AS_STRING rmq.Push(pMsg); if(GetRetained()) SetRetainedPubMask(MQTT_VALUE_BINBE); } void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE); CJson_t jtValue(json_object()); CJson_t jtPath(json_string(GetPath())); CJson_t jtIndex(json_integer(GetIndex())); CJson_t jtName(json_string(GetName())); CJson_t jtVal(GetJsonValue()); json_object_set(jtValue, "path", jtPath); json_object_set(jtValue, "index", jtIndex); json_object_set(jtValue, "name", jtName); json_object_set(jtValue, "value", jtVal); char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS); std::string v = pszJson; free(pszJson); CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); rmq.Push(pMsg); if(GetRetained()) SetRetainedPubMask(MQTT_VALUE_JSON); } void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE); TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str()); } std::string toString(T v) { if(std::is_integral::value) { if(std::is_signed::value) { long long r = (long long)v; return formatString("%lld", r); } else if(std::is_unsigned::value) { unsigned long long r = (unsigned long long)v; return formatString("%llu", r); } } else if(std::is_floating_point::value) { double r = (double)v; return formatString("%.20g", r); } return ""; } json_t* GetJsonValue(void) const { if(std::is_integral::value) return m_bIsBool ? json_boolean(!!*m_pShadow) : ::json_integer((json_int_t)*m_pShadow); else if(std::is_floating_point::value) return ::json_real((double)*m_pShadow); ASSERT(false); return NULL; } bool UpdateShadowBuffer(int &nLocks) { Lock(nLocks); bool bChanged = *m_pShadow != *m_pData; if(bChanged) { *m_pShadow = *m_pData; TRACE("Changed: %s\n", GetPath()); } Unlock(nLocks); return bChanged; } private: volatile T *m_pData; T *m_pShadow; bool m_bIsBool; }; ///////////////////////////////////////////////////////////////////////////// template class CMqttStringVariable : public CMqttVar { public: typedef enum { VT_Invalid, // 0 VT_Latin1, // 1 VT_UTF_8, // 2 VT_UTF_16, // 3 VT_UTF_32, // 4 VT_Unicode, // 5 VT_Last }VT; public: CMqttStringVariable(CMqttStringVariable &&m) noexcept : CMqttVar(std::move(m)), m_nCChBuffer(std::move(m.m_nCChBuffer)), m_nCbBuffer(std::move(m.m_nCbBuffer)), m_nCbString(std::move(m.m_nCbString)), m_pszData(std::move(m.m_pszData)), m_pszShadow(std::move(m.m_pszShadow)) { } CMqttStringVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nCChBuffer, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent), m_nCChBuffer(nCChBuffer), m_nCbBuffer(nCChBuffer * sizeof(T)), m_nCbString(0), m_pszData((T*)pData), m_pszShadow((T*)pShadow) { ASSERT(m_pszData); ASSERT(m_pszShadow); ASSERT(m_nCChBuffer > 0); int nLocks = 0; Lock(nLocks); zeroTerm(m_nCChBuffer - 1); m_nCbString = slen(m_pszData) * sizeof(T); memcpy(m_pszShadow, (const void*)m_pszData, m_nCbBuffer); Unlock(nLocks); } public: virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { uint32_t nPubMaskForcedOnce = GetForcedPublishMask(); ClearForcedPublishMask(); if(!PublishEnabled()) return; if(!UpdateShadowBuffer(nLocks)) { if(nPubMaskForcedOnce & MQTT_VALUE_BINLE) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_BINBE) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_JSON) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_PBUF) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } else { if(PublishEnabled(MQTT_VALUE_BINLE)) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_BINBE)) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_JSON)) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_PBUF)) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } } virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) { switch(nValFormat) { case MQTT_VALUE_BINLE: return SetShmValueLE(pMsg, nLocks); case MQTT_VALUE_BINBE: return SetShmValueBE(pMsg, nLocks); case MQTT_VALUE_JSON: return SetShmValueJson(pMsg, nLocks); case MQTT_VALUE_PBUF: return SetShmValuePBuf(pMsg, nLocks); default: break; } return false; } private: bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks) { if(pMsg->payload && pMsg->payloadlen > 0) { if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer) { Lock(nLocks); m_nCbString = (size_t)pMsg->payloadlen; #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T)); #else // __BYTE_ORDER__ memcpy((void*)m_pszData, pMsg->payload, m_nCbString); #endif // __BYTE_ORDER__ zeroTerm(m_nCbString / sizeof(T)); Unlock(nLocks); return true; } } else { Lock(nLocks); *m_pszData = '\0'; m_nCbString = 0; Unlock(nLocks); return true; } return false; } bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks) { if(pMsg->payload && pMsg->payloadlen > 0) { if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer) { Lock(nLocks); m_nCbString = (size_t)pMsg->payloadlen; #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T)); #else // __BYTE_ORDER__ memcpy((void*)m_pszData, pMsg->payload, m_nCbString); #endif // __BYTE_ORDER__ zeroTerm(m_nCbString / sizeof(T)); Unlock(nLocks); return true; } } else { Lock(nLocks); *m_pszData = '\0'; m_nCbString = 0; Unlock(nLocks); return true; } return false; } bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks) { if(pMsg) { CJson_t jtRoot, jtVal; std::string sErr; if(pMsg->GetPayloadAsJSON(jtRoot, sErr)) { if(jtRoot.GetValue("value", jtVal)) { const json_t *pjt = jtVal; if(json_is_string(pjt)) { std::string s(::json_string_value(jtVal)); if(s.length() > 0) { Lock(nLocks); if(fromUTF8(s.c_str(), s.length(), (T*)m_pszData, m_nCbBuffer)) m_nCbString = slen(m_pszData) * sizeof(T); Unlock(nLocks); } else { Lock(nLocks); *m_pszData = '\0'; m_nCbString = 0; Unlock(nLocks); } return true; } } } } return false; } bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks) { TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath()); return false; } void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE); #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ T szBuf[m_nCChBuffer]; CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained()); #else // __BYTE_ORDER__ CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained()); #endif // __BYTE_ORDER__ rmq.Push(pMsg); } void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE); #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ T szBuf[m_nCChBuffer]; CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained()); #else // __BYTE_ORDER__ CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained()); #endif // __BYTE_ORDER__ rmq.Push(pMsg); } void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE); CJson_t jtValue(json_object()); CJson_t jtPath(json_string(GetPath())); CJson_t jtIndex(json_integer(GetIndex())); CJson_t jtName(json_string(GetName())); CJson_t jtVal(GetJsonValue()); json_object_set(jtValue, "path", jtPath); json_object_set(jtValue, "index", jtIndex); json_object_set(jtValue, "name", jtName); json_object_set(jtValue, "value", jtVal); char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS); std::string v = pszJson; free(pszJson); CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); rmq.Push(pMsg); } void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE); TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str()); } json_t* GetJsonValue(void) const { char szBuf[m_nCbBuffer * 4]; const char *pszUtf8 = toUTF8(m_pszShadow, m_nCbString, szBuf, sizeof(szBuf)); if(pszUtf8) return json_string(pszUtf8); return NULL; } const char* toUTF8(const T *pszIn, size_t nCbIn, char *pszOut, size_t nCbOut) const { switch(ST) { case VT_Latin1: Latin1ToUtf8(pszIn, nCbIn, pszOut, nCbOut); break; case VT_UTF_8: strncpy(pszOut, (const char*)pszIn, nCbOut - 1); pszOut[nCbOut - 1] = '\0'; break; case VT_UTF_16: EncToUtf8(_UTF_16, pszIn, nCbIn, pszOut, nCbOut); break; case VT_UTF_32: EncToUtf8(_UTF_32, pszIn, nCbIn, pszOut, nCbOut); break; case VT_Unicode: EncToUtf8(_UNICODE, pszIn, nCbIn, pszOut, nCbOut); break; default: ASSERT(false); return NULL; } return pszOut; } const T* fromUTF8(const char *pszIn, size_t nCbIn, T *pszOut, size_t nCbOut) const { size_t nRet; switch(ST) { case VT_Latin1: nRet = Utf8ToLatin1(pszIn, nCbIn, pszOut, nCbOut); pszOut[nRet / sizeof(T)] = T('\0'); break; case VT_UTF_8: strncpy((char*)pszOut, pszIn, nCbOut - 1); pszOut[nCbOut - 1] = T('\0'); break; case VT_UTF_16: nRet = Utf8ToUtf16(pszIn, nCbIn, (char16_t*)pszOut, nCbOut); pszOut[nRet / sizeof(T)] = T('\0'); break; case VT_UTF_32: nRet = Utf8ToUtf32(pszIn, nCbIn, (char32_t*)pszOut, nCbOut); pszOut[nRet / sizeof(T)] = T('\0'); break; case VT_Unicode: nRet = Utf8ToWcs(pszIn, nCbIn, (wchar_t*)pszOut, nCbOut); pszOut[nRet / sizeof(T)] = T('\0'); break; default: ASSERT(false); return NULL; } return pszOut; } size_t slen(volatile const T *s) { volatile const T *p = s; while(*p) ++p; return p - s; } void zeroTerm(size_t at) { m_pszData[at] = (T)'\0'; } bool UpdateShadowBuffer(int &nLocks) { Lock(nLocks); bool bChanged = !!memcmp(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T)); if(bChanged) { zeroTerm(m_nCChBuffer - 1); m_nCbString = slen(m_pszData) * sizeof(T); memcpy(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T)); TRACE("Changed: %s\n", GetPath()); } Unlock(nLocks); return bChanged; } private: const size_t m_nCChBuffer; const size_t m_nCbBuffer; size_t m_nCbString; volatile T *m_pszData; T *m_pszShadow; }; ///////////////////////////////////////////////////////////////////////////// #define GET_BOOL_VAL(p, m) (!!(*p & m)) #define SET_BIT(p, m) (*p |= m) #define CLR_BIT(p, m) (*p &= ~m) #define STORE_BIT(p, m, b) (b) ? SET_BIT(p, m) : CLR_BIT(p, m) class CMqttBitVariable : public CMqttVar { public: CMqttBitVariable(CMqttBitVariable &&m) noexcept : CMqttVar(std::move(m)), m_mask(std::move(m.m_mask)), m_pDataByte(std::move(m.m_pDataByte)), m_pShadowByte(std::move(m.m_pShadowByte)) { } CMqttBitVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nOffset, int nBitNr, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : CMqttVar(hShm, pszName, -1, nPubMask, nQos, bRetain, pParent), m_mask(1 << nBitNr), m_pDataByte((uint8_t*)pData + nOffset), m_pShadowByte((uint8_t*)pShadow + nOffset) { ASSERT(m_pDataByte); ASSERT(m_pShadowByte); int nLocks = 0; Lock(nLocks); STORE_BIT(m_pShadowByte, m_mask, GET_BOOL_VAL(m_pDataByte, m_mask)); Unlock(nLocks); } public: virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { uint32_t nPubMaskForcedOnce = GetForcedPublishMask(); ClearForcedPublishMask(); if(!PublishEnabled()) return; if(!UpdateShadowBuffer(nLocks)) { if(nPubMaskForcedOnce & MQTT_VALUE_BINLE) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_BINBE) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_JSON) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(nPubMaskForcedOnce & MQTT_VALUE_PBUF) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } else { if(PublishEnabled(MQTT_VALUE_BINLE)) PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_BINBE)) PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_JSON)) PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks); if(PublishEnabled(MQTT_VALUE_PBUF)) PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks); } } virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) { switch(nValFormat) { case MQTT_VALUE_BINLE: case MQTT_VALUE_BINBE: return SetShmValueBin(pMsg, nLocks); case MQTT_VALUE_JSON: return SetShmValueJson(pMsg, nLocks); case MQTT_VALUE_PBUF: return SetShmValuePBuf(pMsg, nLocks); default: break; } return false; } private: bool SetShmValueBin(CMqttMessage *pMsg, int &nLocks) { if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(bool))) { bool bVal = !!(*(bool*)pMsg->payload); Lock(nLocks); STORE_BIT(m_pDataByte, m_mask, bVal); Unlock(nLocks); return true; } return false; } bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks) { bool bRet = false; if(pMsg) { bool bVal; CJson_t jtRoot, jtVal; std::string sErr; if(pMsg->GetPayloadAsJSON(jtRoot, sErr)) { if(jtRoot.GetValue("value", jtVal)) { const json_t *pjt = jtVal; if(json_is_boolean(pjt)) { bVal = json_boolean_value(pjt); bRet = true; } else if(json_is_integer(pjt)) { bVal = !!::json_integer_value(pjt); bRet = true; } if(bRet) { Lock(nLocks); STORE_BIT(m_pDataByte, m_mask, bVal); Unlock(nLocks); } } } } return bRet; } bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks) { TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath()); return false; } void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE); bool val = GET_BOOL_VAL(m_pShadowByte, m_mask); #if _RETURN_BIN_AS_STRING std::string v = val ? "true" : "false"; CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); #else // _RETURN_BIN_AS_STRING CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained()); #endif // _RETURN_BIN_AS_STRING rmq.Push(pMsg); } void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE); bool val = GET_BOOL_VAL(m_pShadowByte, m_mask); #if _RETURN_BIN_AS_STRING std::string v = val ? "true" : "false"; CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); #else // _RETURN_BIN_AS_STRING CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained()); #endif // _RETURN_BIN_AS_STRING rmq.Push(pMsg); } void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE); CJson_t jtValue(json_object()); CJson_t jtPath(json_string(GetPath())); CJson_t jtIndex(json_integer(GetIndex())); CJson_t jtName(json_string(GetName())); CJson_t jtVal(GetJsonValue()); json_object_set(jtValue, "path", jtPath); json_object_set(jtValue, "index", jtIndex); json_object_set(jtValue, "name", jtName); json_object_set(jtValue, "value", jtVal); char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS); std::string v = pszJson; free(pszJson); CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained()); rmq.Push(pMsg); } void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE); TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str()); } json_t* GetJsonValue(void) const { return json_boolean(GET_BOOL_VAL(m_pShadowByte, m_mask)); } bool UpdateShadowBuffer(int &nLocks) { bool bVal; Lock(nLocks); bool bChanged = GET_BOOL_VAL(m_pShadowByte, m_mask) != (bVal = GET_BOOL_VAL(m_pDataByte, m_mask)); if(bChanged) { STORE_BIT(m_pShadowByte, m_mask, bVal); TRACE("Changed: %s\n", GetPath()); } Unlock(nLocks); return bChanged; } private: uint8_t m_mask; volatile uint8_t *m_pDataByte; uint8_t *m_pShadowByte; }; ///////////////////////////////////////////////////////////////////////////// template class CMqttVarArray : public CMqttVar, public std::vector { public: CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent) { for(size_t i = 0; i < nElemCount; i++) { this->emplace_back(&((V*)pData)[i], &((V*)pShadow)[i], hShm, pszName, i, nPubMask, nQos, bRetain, static_cast(this)); } } CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, size_t nCChData, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent) { for(size_t i = 0; i < nElemCount; i++) { this->emplace_back(&((V*)pData)[i * nCChData], &((V*)pShadow)[i * nCChData], hShm, pszName, nCChData, i, nPubMask, nQos, bRetain, static_cast(this)); } } public: virtual void CreateMembersTable(CMqttVarTable &vt) { CMqttVar::CreateMembersTable(vt); for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); rVar.CreateMembersTable(vt); } } virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1) { CMqttVar::InitPath(pParent, pszMemberName, nIndex); int j = 0; for(auto i = this->begin(); i != this->end(); ++i, ++j) { CMqttVar &rVar = static_cast(*i); rVar.InitPath(pParent, pszMemberName, j); } } virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) { Lock(nLocks); for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); rVar.CheckShmAndPublish(pszTopicDevID, pszTopicShmID, rmq, nLocks); } Unlock(nLocks); } virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt) { bool bRet = false; for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); bRet = rVar.EnablePublish(nMask, pvt) || bRet; } return bRet; } virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt) { bool bRet = false; for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); bRet = rVar.DisablePublish(nMask, pvt) || bRet; } return bRet; } virtual bool SetQoS(int nQos) { bool bRet = false; for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); bRet = rVar.SetQoS(nQos) || bRet; } return bRet; } virtual bool SetRetained(bool bRetain) { bool bRet = false; for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); bRet = rVar.SetRetained(bRetain) || bRet; } return bRet; } virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce) { for(auto i = this->begin(); i != this->end(); ++i) { CMqttVar &rVar = static_cast(*i); rVar.RemoveRetained(pszTopicDevID, pszTopicShmID, rmq, bForce); } } }; ///////////////////////////////////////////////////////////////////////////// #endif // __cplusplus #endif // !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)