mqttvar.h 39 KB


  1. // mqttvar.h :
  2. //
  3. #if !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)
  4. #define AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_
  5. #include <stdint.h>
  6. #include <string>
  7. #include <vector>
  8. #include <map>
  9. #include <forward_list>
  10. #include <utility>
  11. #include <endian.h>
  12. #include <gfa/gfaipc.h>
  13. #include <typeinfo>
  14. #include <typeindex>
  15. #ifndef _LIBBUILD
  16. #include <gfa/svc/common/strutil.h>
  17. #include <gfa/svc/common/conv.h>
  18. #include <gfa/svc/common/debug.h>
  19. #include <gfa/svc/mqttcl/mqttmsg.h>
  20. #include <gfa/svc/mqttcl/mqttjson.h>
  21. #include <gfa/svc/mqttcl/mqttdbg.h>
  22. #else // _LIBBUILD
  23. #include "common/strutil.h"
  24. #include "common/conv.h"
  25. #include "common/debug.h"
  26. #include "mqttmsg.h"
  27. #include "mqttjson.h"
  28. #include "mqttdbg.h"
  29. #endif // _LIBBUILD
  30. /////////////////////////////////////////////////////////////////////////////
  31. #define MQTT_VALUE_BINLE 0x00000001
  32. #define MQTT_VALUE_BINBE 0x00000002
  33. #define MQTT_VALUE_JSON 0x00000004
  34. #define MQTT_VALUE_PBUF 0x00000008
  35. #define MQTT_VALUE_ALL_FORMATS (MQTT_VALUE_BINLE | MQTT_VALUE_BINBE | MQTT_VALUE_JSON | MQTT_VALUE_PBUF)
  36. #define MQTT_TOPIC_CMD_VALUE "VALUE"
  37. #define MQTT_TOPIC_VALUE_BINLE "BINLE"
  38. #define MQTT_TOPIC_VALUE_BINBE "BINBE"
  39. #define MQTT_TOPIC_VALUE_JSON "JSON"
  40. #define MQTT_TOPIC_VALUE_PBUF "PBUF"
  41. #ifdef _DEBUG
  42. #define MQTT_JSON_OUTPUT_FLAGS JSON_MAX_INDENT
  43. #else // _DEBUG
  44. #define MQTT_JSON_OUTPUT_FLAGS JSON_COMPACT
  45. #endif // _DEBUG
  46. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  47. #define _UTF_16 "UTF-16LE//"
  48. #define _UTF_32 "UTF-32LE//"
  49. #define _UNICODE "WCHAR_T//"
  50. #elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  51. #define _UTF_16 "UTF-16BE//"
  52. #define _UTF_32 "UTF-32BE//"
  53. #define _UNICODE "WCHAR_T//"
  54. #else // __BYTE_ORDER__
  55. #error Invalid or unsupported byte order!
  56. #endif // __BYTE_ORDER__
  57. #define MQTTCL_MIN_QOS 0
  58. #define MQTTCL_MAX_QOS 2
  59. #ifdef __cplusplus
  60. /////////////////////////////////////////////////////////////////////////////
  61. template <typename T>
  62. static void _swap_val(T &v)
  63. {
  64. // should static assert that T is a POD
  65. if(sizeof(T) > 1)
  66. {
  67. char &raw = reinterpret_cast<char&>(v);
  68. std::reverse(&raw, &raw + sizeof(T));
  69. }
  70. }
  71. template <typename T>
  72. static const T* _copy_swap_string_chars(const T *s, T *v, size_t cch)
  73. {
  74. if(sizeof(T) > 1 && cch > 0)
  75. {
  76. for(T *pv = v; cch > 0; --cch, ++pv, ++s)
  77. {
  78. *pv = *s;
  79. if(*pv)
  80. _swap_val(*pv);
  81. else
  82. break;
  83. }
  84. return v;
  85. }
  86. else
  87. {
  88. memcpy(v, s, cch);
  89. return v;
  90. }
  91. }
  92. /////////////////////////////////////////////////////////////////////////////
  93. // mqttvar.h - Declarations:
  94. class CMqttVar;
  95. class CMqttVarTable
  96. {
  97. public:
  98. typedef bool (*_PFNCMP)(const char*, const char*);
  99. public:
  100. CMqttVarTable(void);
  101. virtual ~CMqttVarTable(void);
  102. void AddVar(CMqttVar *pv);
  103. CMqttVar* Find(const char *key) const;
  104. inline size_t size(void) const {
  105. return m_map.size();}
  106. bool AddToPubTable(CMqttVar *pv);
  107. bool RemoveFromPubTable(CMqttVar *pv);
  108. void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks);
  109. void DumpPubEnabled(uint32_t nPubMask = MQTT_VALUE_ALL_FORMATS);
  110. void TracePubVar(const char *pszPath, uint32_t nMask, int nQos, bool bRetained);
  111. private:
  112. std::map<const char*, CMqttVar*, _PFNCMP> m_map;
  113. std::map<const char*, CMqttVar*, _PFNCMP> m_pub;
  114. };
  115. /////////////////////////////////////////////////////////////////////////////
  116. class CMqttVar
  117. {
  118. public:
  119. CMqttVar(CMqttVar &&m) noexcept :
  120. m_hShm(std::move(m.m_hShm)),
  121. m_pParent(std::move(m.m_pParent)),
  122. m_nPubMask(std::move(m.m_nPubMask)),
  123. m_nPubMaskForcedOnce(m_nPubMask),
  124. m_path(std::move(m.m_path)),
  125. m_pszPath(m_path.c_str()),
  126. m_nCbVarpath(std::move(m.m_nCbVarpath)),
  127. m_name(std::move(m.m_name)),
  128. m_nIndex(std::move(m.m_nIndex)),
  129. m_nQos(std::move(m.m_nQos)),
  130. m_bRetain(std::move(m.m_bRetain)),
  131. m_nRetainedPubMask(std::move(m.m_nRetainedPubMask))
  132. {
  133. }
  134. CMqttVar(HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  135. m_hShm(hShm),
  136. m_pParent(pParent),
  137. m_nPubMask(nPubMask),
  138. m_nPubMaskForcedOnce(m_nPubMask),
  139. m_pszPath(NULL),
  140. m_nCbVarpath(0),
  141. m_name(pszName ? pszName : ""),
  142. m_nIndex(nIndex),
  143. m_nQos(nQos),
  144. m_bRetain(bRetain),
  145. m_nRetainedPubMask(0)
  146. {
  147. }
  148. virtual ~CMqttVar(void)
  149. {
  150. }
  151. public:
  152. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) = 0;
  153. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  154. return false;
  155. }
  156. virtual void CreateMembersTable(CMqttVarTable &vt) {
  157. vt.AddVar(this);}
  158. virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt) {
  159. if((m_nPubMask & nMask) != nMask)
  160. {
  161. #if _DUMP_CONTROL_CHANGED
  162. uint32_t nEnable = ((nMask ^ m_nPubMask) & nMask);
  163. #endif // _DUMP_CONTROL_CHANGED
  164. m_nPubMaskForcedOnce |= ((nMask ^ m_nPubMask) & nMask);
  165. m_nPubMask |= nMask;
  166. if(pvt)
  167. {
  168. pvt->AddToPubTable(this);
  169. #if _TRACK_TIMES
  170. g_nDbgCounter1++;
  171. #endif // _TRACK_TIMES
  172. #if _DUMP_CONTROL_CHANGED
  173. TraceFormatChange(nEnable, true);
  174. #endif // _DUMP_CONTROL_CHANGED
  175. return true;
  176. }
  177. }
  178. return false;
  179. }
  180. virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt) {
  181. if(m_nPubMask & nMask)
  182. {
  183. #if _DUMP_CONTROL_CHANGED
  184. uint32_t nDisable = (((~nMask) ^ m_nPubMask) & nMask);
  185. #endif // _DUMP_CONTROL_CHANGED
  186. m_nPubMask &= ~nMask;
  187. m_nPubMaskForcedOnce &= ~nMask;
  188. if(pvt)
  189. {
  190. pvt->RemoveFromPubTable(this);
  191. #if _DUMP_CONTROL_CHANGED
  192. TraceFormatChange(nDisable, false);
  193. #endif // _DUMP_CONTROL_CHANGED
  194. return true;
  195. }
  196. }
  197. return false;
  198. }
  199. bool PublishEnabled(uint32_t nMask = MQTT_VALUE_ALL_FORMATS) const {
  200. return !!(m_nPubMask & nMask);}
  201. uint32_t GetPublishMask(void) const {
  202. return m_nPubMask;}
  203. uint32_t GetForcedPublishMask(void) const {
  204. return m_nPubMaskForcedOnce;}
  205. void ClearForcedPublishMask(void) {
  206. m_nPubMaskForcedOnce = 0;}
  207. virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1) {
  208. CreatePath(pParent, pszMemberName, nIndex, m_path);
  209. m_pszPath = m_path.c_str();
  210. m_nCbVarpath = m_path.length();
  211. }
  212. virtual const char* GetPath(void) const {
  213. return m_pszPath;}
  214. int GetQoS(void) const {
  215. return m_nQos;}
  216. virtual bool SetQoS(int nQos) {
  217. if(nQos < MQTTCL_MIN_QOS)
  218. nQos = MQTTCL_MIN_QOS;
  219. else if(nQos > MQTTCL_MAX_QOS)
  220. nQos = MQTTCL_MAX_QOS;
  221. if(m_nQos != nQos) {
  222. TraceQOSChange(m_nQos, nQos);
  223. m_nQos = nQos;
  224. return true;
  225. }
  226. return false;
  227. }
  228. bool GetRetained(void) const {
  229. return m_bRetain;}
  230. virtual bool SetRetained(bool bRetain) {
  231. if(m_bRetain != bRetain) {
  232. m_bRetain = bRetain;
  233. TraceRetainChange(bRetain);
  234. return true;
  235. }
  236. else
  237. return false;
  238. }
  239. void SetRetainedPubMask(uint32_t nMask) {
  240. m_nRetainedPubMask |= nMask;
  241. }
  242. virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
  243. {
  244. if(!m_bRetain)
  245. {
  246. std::string s;
  247. CMqttMessage *pMsg;
  248. if((m_nRetainedPubMask & MQTT_VALUE_BINLE) || bForce)
  249. {
  250. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  251. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  252. rmq.Push(pMsg);
  253. m_nRetainedPubMask &= ~MQTT_VALUE_BINLE;
  254. }
  255. if((m_nRetainedPubMask & MQTT_VALUE_BINBE) || bForce)
  256. {
  257. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  258. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  259. rmq.Push(pMsg);
  260. m_nRetainedPubMask &= ~MQTT_VALUE_BINBE;
  261. }
  262. if((m_nRetainedPubMask & MQTT_VALUE_JSON) || bForce)
  263. {
  264. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  265. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  266. rmq.Push(pMsg);
  267. m_nRetainedPubMask &= ~MQTT_VALUE_JSON;
  268. }
  269. if((m_nRetainedPubMask & MQTT_VALUE_PBUF) || bForce)
  270. {
  271. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  272. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  273. rmq.Push(pMsg);
  274. m_nRetainedPubMask &= ~MQTT_VALUE_PBUF;
  275. }
  276. }
  277. }
  278. protected:
  279. static void CreatePath(CMqttVar *pParent, const char *pszMemberName, int nIndex, std::string &rPath)
  280. {
  281. char szIndex[32] = {0};
  282. if(!pszMemberName || !*pszMemberName)
  283. pszMemberName = "/";
  284. if(nIndex >= 0)
  285. {
  286. sprintf(szIndex, "/%d", nIndex);
  287. }
  288. if(pParent)
  289. {
  290. rPath = pParent->GetPath();
  291. auto len = rPath.size();
  292. if(len > 0)
  293. {
  294. auto rend = rPath.rbegin();
  295. if(*rend != '/')
  296. rPath += "/";
  297. }
  298. rPath += pszMemberName;
  299. rPath += szIndex;
  300. }
  301. else
  302. {
  303. rPath = pszMemberName;
  304. rPath += szIndex;
  305. }
  306. }
  307. std::string CreateTopic(const char *pszTopicDevID, const char *pszTopicShmID, const char *pszValueFormat, const char *pszTopicCmd)
  308. {
  309. if((!pszTopicDevID || !*pszTopicDevID) && (!pszTopicShmID && !*pszTopicShmID))
  310. return "";
  311. else if(pszTopicDevID && *pszTopicDevID && pszTopicShmID && *pszTopicShmID)
  312. return ::formatString("%s/%s/%s/%s%s", pszTopicDevID, pszTopicShmID, pszValueFormat, pszTopicCmd, m_pszPath);
  313. else
  314. {
  315. const char *pszTopic = (pszTopicDevID && *pszTopicDevID) ? pszTopicDevID : pszTopicShmID;
  316. return ::formatString("%s/%s/%s%s", pszTopic, pszValueFormat, pszTopicCmd, m_pszPath);
  317. }
  318. }
  319. void Lock(int &nLocks)
  320. {
  321. if(!nLocks)
  322. {
  323. ::GfaIpcLockSHM(m_hShm);
  324. ++nLocks;
  325. }
  326. }
  327. void Unlock(int &nLocks)
  328. {
  329. if(nLocks)
  330. {
  331. --nLocks;
  332. ::GfaIpcUnlockSHM(m_hShm);
  333. }
  334. }
  335. const char* GetName(void) const {
  336. return m_name.c_str();}
  337. int GetIndex(void) const {
  338. return m_nIndex;}
  339. private:
  340. void TraceFormatChange(uint32_t nMask, bool bOn)
  341. {
  342. #ifdef _DUMP_CONTROL_CHANGED
  343. if(m_pszPath && nMask)
  344. {
  345. int nCount = 0;
  346. TRACE("%s ==> %s ", m_pszPath, bOn ? "ON" : "OFF");
  347. if(nMask & MQTT_VALUE_BINLE)
  348. {
  349. TRACE("%s", MQTT_TOPIC_VALUE_BINLE);
  350. ++nCount;
  351. }
  352. if(nMask & MQTT_VALUE_BINBE)
  353. {
  354. if(nCount)
  355. TRACE(", ");
  356. TRACE("%s", MQTT_TOPIC_VALUE_BINBE);
  357. ++nCount;
  358. }
  359. if(nMask & MQTT_VALUE_JSON)
  360. {
  361. if(nCount)
  362. TRACE(", ");
  363. TRACE("%s", MQTT_TOPIC_VALUE_JSON);
  364. ++nCount;
  365. }
  366. if(nMask & MQTT_VALUE_PBUF)
  367. {
  368. if(nCount)
  369. TRACE(", ");
  370. TRACE("%s", MQTT_TOPIC_VALUE_PBUF);
  371. ++nCount;
  372. }
  373. TRACE("\n");
  374. }
  375. #endif // _DUMP_CONTROL_CHANGED
  376. }
  377. void TraceQOSChange(int nQosOld, int nQosNew)
  378. {
  379. #ifdef _DUMP_CONTROL_CHANGED
  380. if(m_pszPath)
  381. {
  382. TRACE("%s ==> QOS: %d -> %d\n", m_pszPath, nQosOld, nQosNew);
  383. }
  384. #endif // _DUMP_CONTROL_CHANGED
  385. }
  386. void TraceRetainChange(bool bRetained)
  387. {
  388. #ifdef _DUMP_CONTROL_CHANGED
  389. if(m_pszPath)
  390. {
  391. TRACE("%s ==> Retain: %s\n", m_pszPath, bRetained ? "true" : "false");
  392. }
  393. #endif // _DUMP_CONTROL_CHANGED
  394. }
  395. private:
  396. HSHM m_hShm;
  397. CMqttVar *m_pParent;
  398. uint32_t m_nPubMask;
  399. uint32_t m_nPubMaskForcedOnce;
  400. std::string m_path;
  401. const char *m_pszPath;
  402. size_t m_nCbVarpath;
  403. std::string m_name;
  404. int m_nIndex;
  405. int m_nQos;
  406. bool m_bRetain;
  407. uint32_t m_nRetainedPubMask;
  408. };
  409. /////////////////////////////////////////////////////////////////////////////
  410. template <typename T>
  411. class CMqttVariable : public CMqttVar
  412. {
  413. public:
  414. CMqttVariable(CMqttVariable &&m) noexcept :
  415. CMqttVar(std::move(m)),
  416. m_pData(std::move(m.m_pData)),
  417. m_pShadow(std::move(m.m_pShadow)),
  418. m_bIsBool(std::move(m.m_bIsBool))
  419. {
  420. }
  421. CMqttVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  422. CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
  423. m_pData((volatile T*)pData),
  424. m_pShadow((T*)pShadow),
  425. m_bIsBool(std::type_index(typeid(T)) == std::type_index(typeid(bool)))
  426. {
  427. ASSERT(m_pData);
  428. ASSERT(m_pShadow);
  429. ASSERT(hShm);
  430. int nLocks = 0;
  431. Lock(nLocks);
  432. *m_pShadow = *m_pData;
  433. Unlock(nLocks);
  434. }
  435. /////////////////////////////////////////////////////////////////////////
  436. public:
  437. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  438. {
  439. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  440. ClearForcedPublishMask();
  441. if(!PublishEnabled())
  442. return;
  443. if(!UpdateShadowBuffer(nLocks))
  444. {
  445. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  446. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  447. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  448. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  449. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  450. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  451. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  452. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  453. }
  454. else
  455. {
  456. if(PublishEnabled(MQTT_VALUE_BINLE))
  457. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  458. if(PublishEnabled(MQTT_VALUE_BINBE))
  459. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  460. if(PublishEnabled(MQTT_VALUE_JSON))
  461. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  462. if(PublishEnabled(MQTT_VALUE_PBUF))
  463. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  464. }
  465. }
  466. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  467. switch(nValFormat)
  468. {
  469. case MQTT_VALUE_BINLE:
  470. return SetShmValueLE(pMsg, nLocks);
  471. case MQTT_VALUE_BINBE:
  472. return SetShmValueBE(pMsg, nLocks);
  473. case MQTT_VALUE_JSON:
  474. return SetShmValueJson(pMsg, nLocks);
  475. case MQTT_VALUE_PBUF:
  476. return SetShmValuePBuf(pMsg, nLocks);
  477. default:
  478. break;
  479. }
  480. return false;
  481. }
  482. /////////////////////////////////////////////////////////////////////////
  483. private:
  484. bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
  485. {
  486. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
  487. {
  488. T val = *(T*)pMsg->payload;
  489. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  490. _swap_val(val);
  491. #endif // __BYTE_ORDER__
  492. Lock(nLocks);
  493. *m_pData = val;
  494. Unlock(nLocks);
  495. return true;
  496. }
  497. return false;
  498. }
  499. bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
  500. {
  501. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
  502. {
  503. T val = *(T*)pMsg->payload;
  504. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  505. _swap_val(val);
  506. #endif // __BYTE_ORDER__
  507. Lock(nLocks);
  508. *m_pData = val;
  509. Unlock(nLocks);
  510. return true;
  511. }
  512. return false;
  513. }
  514. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  515. {
  516. bool bRet = false;
  517. if(pMsg)
  518. {
  519. T val;
  520. CJson_t jtRoot, jtVal;
  521. std::string sErr;
  522. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  523. {
  524. if(jtRoot.GetValue("value", jtVal))
  525. {
  526. const json_t *pjt = jtVal;
  527. if(m_bIsBool)
  528. {
  529. if(json_is_boolean(pjt))
  530. {
  531. val = (T)json_boolean_value(pjt);
  532. bRet = true;
  533. }
  534. else if(json_is_integer(pjt))
  535. {
  536. val = !!::json_integer_value(pjt);
  537. bRet = true;
  538. }
  539. }
  540. else if(std::is_integral<T>::value)
  541. {
  542. if(json_is_integer(pjt))
  543. {
  544. val = (T)::json_integer_value(pjt);
  545. bRet = true;
  546. }
  547. else if(json_is_boolean(pjt))
  548. {
  549. val = (T)(json_boolean_value(pjt) ? 1 : 0);
  550. bRet = true;
  551. }
  552. }
  553. else if(std::is_floating_point<T>::value)
  554. {
  555. if(json_is_real(pjt))
  556. {
  557. val = (T)::json_real_value(pjt);
  558. bRet = true;
  559. }
  560. else if(json_is_integer(pjt))
  561. {
  562. val = (T)::json_integer_value(pjt);
  563. bRet = true;
  564. }
  565. else if(json_is_boolean(pjt))
  566. {
  567. val = (T)(json_boolean_value(pjt) ? 1 : 0);
  568. bRet = true;
  569. }
  570. }
  571. if(bRet)
  572. {
  573. Lock(nLocks);
  574. *m_pData = val;
  575. Unlock(nLocks);
  576. }
  577. }
  578. }
  579. }
  580. return bRet;
  581. }
  582. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  583. {
  584. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  585. return false;
  586. }
  587. /////////////////////////////////////////////////////////////////////////
  588. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  589. {
  590. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  591. #if _RETURN_BIN_AS_STRING
  592. std::string v = toString(*m_pShadow);
  593. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  594. #else // _RETURN_BIN_AS_STRING
  595. T val = *m_pShadow;
  596. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  597. _swap_val(val);
  598. #endif // __BYTE_ORDER__
  599. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  600. #endif // _RETURN_BIN_AS_STRING
  601. rmq.Push(pMsg);
  602. if(GetRetained())
  603. SetRetainedPubMask(MQTT_VALUE_BINLE);
  604. }
  605. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  606. {
  607. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  608. #if _RETURN_BIN_AS_STRING
  609. std::string v = toString(*m_pShadow);
  610. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  611. #else // _RETURN_BIN_AS_STRING
  612. T val = *m_pShadow;
  613. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  614. _swap_val(val);
  615. #endif // __BYTE_ORDER__
  616. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  617. #endif // _RETURN_BIN_AS_STRING
  618. rmq.Push(pMsg);
  619. if(GetRetained())
  620. SetRetainedPubMask(MQTT_VALUE_BINBE);
  621. }
  622. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  623. {
  624. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  625. CJson_t jtValue(json_object());
  626. CJson_t jtPath(json_string(GetPath()));
  627. CJson_t jtIndex(json_integer(GetIndex()));
  628. CJson_t jtName(json_string(GetName()));
  629. CJson_t jtVal(GetJsonValue());
  630. json_object_set(jtValue, "path", jtPath);
  631. json_object_set(jtValue, "index", jtIndex);
  632. json_object_set(jtValue, "name", jtName);
  633. json_object_set(jtValue, "value", jtVal);
  634. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  635. std::string v = pszJson;
  636. free(pszJson);
  637. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  638. rmq.Push(pMsg);
  639. if(GetRetained())
  640. SetRetainedPubMask(MQTT_VALUE_JSON);
  641. }
  642. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  643. {
  644. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  645. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  646. }
  647. std::string toString(T v)
  648. {
  649. if(std::is_integral<T>::value)
  650. {
  651. if(std::is_signed<T>::value)
  652. {
  653. long long r = (long long)v;
  654. return formatString("%lld", r);
  655. }
  656. else if(std::is_unsigned<T>::value)
  657. {
  658. unsigned long long r = (unsigned long long)v;
  659. return formatString("%llu", r);
  660. }
  661. }
  662. else if(std::is_floating_point<T>::value)
  663. {
  664. double r = (double)v;
  665. return formatString("%.20g", r);
  666. }
  667. return "";
  668. }
  669. json_t* GetJsonValue(void) const
  670. {
  671. if(std::is_integral<T>::value)
  672. return m_bIsBool ? json_boolean(!!*m_pShadow) : ::json_integer((json_int_t)*m_pShadow);
  673. else if(std::is_floating_point<T>::value)
  674. return ::json_real((double)*m_pShadow);
  675. ASSERT(false);
  676. return NULL;
  677. }
  678. bool UpdateShadowBuffer(int &nLocks)
  679. {
  680. Lock(nLocks);
  681. bool bChanged = *m_pShadow != *m_pData;
  682. if(bChanged)
  683. {
  684. *m_pShadow = *m_pData;
  685. TRACE("Changed: %s\n", GetPath());
  686. }
  687. Unlock(nLocks);
  688. return bChanged;
  689. }
  690. private:
  691. volatile T *m_pData;
  692. T *m_pShadow;
  693. bool m_bIsBool;
  694. };
  695. /////////////////////////////////////////////////////////////////////////////
  696. template <typename T, int ST>
  697. class CMqttStringVariable : public CMqttVar
  698. {
  699. public:
  700. typedef enum
  701. {
  702. VT_Invalid, // 0
  703. VT_Latin1, // 1
  704. VT_UTF_8, // 2
  705. VT_UTF_16, // 3
  706. VT_UTF_32, // 4
  707. VT_Unicode, // 5
  708. VT_Last
  709. }VT;
  710. public:
  711. CMqttStringVariable(CMqttStringVariable &&m) noexcept :
  712. CMqttVar(std::move(m)),
  713. m_nCChBuffer(std::move(m.m_nCChBuffer)),
  714. m_nCbBuffer(std::move(m.m_nCbBuffer)),
  715. m_nCbString(std::move(m.m_nCbString)),
  716. m_pszData(std::move(m.m_pszData)),
  717. m_pszShadow(std::move(m.m_pszShadow))
  718. {
  719. }
  720. CMqttStringVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nCChBuffer, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  721. CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
  722. m_nCChBuffer(nCChBuffer),
  723. m_nCbBuffer(nCChBuffer * sizeof(T)),
  724. m_nCbString(0),
  725. m_pszData((T*)pData),
  726. m_pszShadow((T*)pShadow)
  727. {
  728. ASSERT(m_pszData);
  729. ASSERT(m_pszShadow);
  730. ASSERT(m_nCChBuffer > 0);
  731. int nLocks = 0;
  732. Lock(nLocks);
  733. zeroTerm(m_nCChBuffer - 1);
  734. m_nCbString = slen(m_pszData) * sizeof(T);
  735. memcpy(m_pszShadow, (const void*)m_pszData, m_nCbBuffer);
  736. Unlock(nLocks);
  737. }
  738. public:
  739. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  740. {
  741. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  742. ClearForcedPublishMask();
  743. if(!PublishEnabled())
  744. return;
  745. if(!UpdateShadowBuffer(nLocks))
  746. {
  747. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  748. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  749. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  750. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  751. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  752. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  753. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  754. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  755. }
  756. else
  757. {
  758. if(PublishEnabled(MQTT_VALUE_BINLE))
  759. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  760. if(PublishEnabled(MQTT_VALUE_BINBE))
  761. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  762. if(PublishEnabled(MQTT_VALUE_JSON))
  763. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  764. if(PublishEnabled(MQTT_VALUE_PBUF))
  765. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  766. }
  767. }
  768. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  769. switch(nValFormat)
  770. {
  771. case MQTT_VALUE_BINLE:
  772. return SetShmValueLE(pMsg, nLocks);
  773. case MQTT_VALUE_BINBE:
  774. return SetShmValueBE(pMsg, nLocks);
  775. case MQTT_VALUE_JSON:
  776. return SetShmValueJson(pMsg, nLocks);
  777. case MQTT_VALUE_PBUF:
  778. return SetShmValuePBuf(pMsg, nLocks);
  779. default:
  780. break;
  781. }
  782. return false;
  783. }
  784. private:
  785. bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
  786. {
  787. if(pMsg->payload && pMsg->payloadlen > 0)
  788. {
  789. if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
  790. {
  791. Lock(nLocks);
  792. m_nCbString = (size_t)pMsg->payloadlen;
  793. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  794. _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
  795. #else // __BYTE_ORDER__
  796. memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
  797. #endif // __BYTE_ORDER__
  798. zeroTerm(m_nCbString / sizeof(T));
  799. Unlock(nLocks);
  800. return true;
  801. }
  802. }
  803. else
  804. {
  805. Lock(nLocks);
  806. *m_pszData = '\0';
  807. m_nCbString = 0;
  808. Unlock(nLocks);
  809. return true;
  810. }
  811. return false;
  812. }
  813. bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
  814. {
  815. if(pMsg->payload && pMsg->payloadlen > 0)
  816. {
  817. if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
  818. {
  819. Lock(nLocks);
  820. m_nCbString = (size_t)pMsg->payloadlen;
  821. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  822. _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
  823. #else // __BYTE_ORDER__
  824. memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
  825. #endif // __BYTE_ORDER__
  826. zeroTerm(m_nCbString / sizeof(T));
  827. Unlock(nLocks);
  828. return true;
  829. }
  830. }
  831. else
  832. {
  833. Lock(nLocks);
  834. *m_pszData = '\0';
  835. m_nCbString = 0;
  836. Unlock(nLocks);
  837. return true;
  838. }
  839. return false;
  840. }
  841. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  842. {
  843. if(pMsg)
  844. {
  845. CJson_t jtRoot, jtVal;
  846. std::string sErr;
  847. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  848. {
  849. if(jtRoot.GetValue("value", jtVal))
  850. {
  851. const json_t *pjt = jtVal;
  852. if(json_is_string(pjt))
  853. {
  854. std::string s(::json_string_value(jtVal));
  855. if(s.length() > 0)
  856. {
  857. Lock(nLocks);
  858. if(fromUTF8(s.c_str(), s.length(), (T*)m_pszData, m_nCbBuffer))
  859. m_nCbString = slen(m_pszData) * sizeof(T);
  860. Unlock(nLocks);
  861. }
  862. else
  863. {
  864. Lock(nLocks);
  865. *m_pszData = '\0';
  866. m_nCbString = 0;
  867. Unlock(nLocks);
  868. }
  869. return true;
  870. }
  871. }
  872. }
  873. }
  874. return false;
  875. }
  876. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  877. {
  878. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  879. return false;
  880. }
  881. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  882. {
  883. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  884. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  885. T szBuf[m_nCChBuffer];
  886. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
  887. #else // __BYTE_ORDER__
  888. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
  889. #endif // __BYTE_ORDER__
  890. rmq.Push(pMsg);
  891. }
  892. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  893. {
  894. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  895. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  896. T szBuf[m_nCChBuffer];
  897. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
  898. #else // __BYTE_ORDER__
  899. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
  900. #endif // __BYTE_ORDER__
  901. rmq.Push(pMsg);
  902. }
  903. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  904. {
  905. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  906. CJson_t jtValue(json_object());
  907. CJson_t jtPath(json_string(GetPath()));
  908. CJson_t jtIndex(json_integer(GetIndex()));
  909. CJson_t jtName(json_string(GetName()));
  910. CJson_t jtVal(GetJsonValue());
  911. json_object_set(jtValue, "path", jtPath);
  912. json_object_set(jtValue, "index", jtIndex);
  913. json_object_set(jtValue, "name", jtName);
  914. json_object_set(jtValue, "value", jtVal);
  915. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  916. std::string v = pszJson;
  917. free(pszJson);
  918. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  919. rmq.Push(pMsg);
  920. }
  921. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  922. {
  923. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  924. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  925. }
  926. json_t* GetJsonValue(void) const
  927. {
  928. char szBuf[m_nCbBuffer * 4];
  929. const char *pszUtf8 = toUTF8(m_pszShadow, m_nCbString, szBuf, sizeof(szBuf));
  930. if(pszUtf8)
  931. return json_string(pszUtf8);
  932. return NULL;
  933. }
  934. const char* toUTF8(const T *pszIn, size_t nCbIn, char *pszOut, size_t nCbOut) const
  935. {
  936. switch(ST)
  937. {
  938. case VT_Latin1:
  939. Latin1ToUtf8(pszIn, nCbIn, pszOut, nCbOut);
  940. break;
  941. case VT_UTF_8:
  942. strncpy(pszOut, (const char*)pszIn, nCbOut - 1);
  943. pszOut[nCbOut - 1] = '\0';
  944. break;
  945. case VT_UTF_16:
  946. EncToUtf8(_UTF_16, pszIn, nCbIn, pszOut, nCbOut);
  947. break;
  948. case VT_UTF_32:
  949. EncToUtf8(_UTF_32, pszIn, nCbIn, pszOut, nCbOut);
  950. break;
  951. case VT_Unicode:
  952. EncToUtf8(_UNICODE, pszIn, nCbIn, pszOut, nCbOut);
  953. break;
  954. default:
  955. ASSERT(false);
  956. return NULL;
  957. }
  958. return pszOut;
  959. }
  960. const T* fromUTF8(const char *pszIn, size_t nCbIn, T *pszOut, size_t nCbOut) const
  961. {
  962. size_t nRet;
  963. switch(ST)
  964. {
  965. case VT_Latin1:
  966. nRet = Utf8ToLatin1(pszIn, nCbIn, pszOut, nCbOut);
  967. pszOut[nRet / sizeof(T)] = T('\0');
  968. break;
  969. case VT_UTF_8:
  970. strncpy((char*)pszOut, pszIn, nCbOut - 1);
  971. pszOut[nCbOut - 1] = T('\0');
  972. break;
  973. case VT_UTF_16:
  974. nRet = Utf8ToUtf16(pszIn, nCbIn, (char16_t*)pszOut, nCbOut);
  975. pszOut[nRet / sizeof(T)] = T('\0');
  976. break;
  977. case VT_UTF_32:
  978. nRet = Utf8ToUtf32(pszIn, nCbIn, (char32_t*)pszOut, nCbOut);
  979. pszOut[nRet / sizeof(T)] = T('\0');
  980. break;
  981. case VT_Unicode:
  982. nRet = Utf8ToWcs(pszIn, nCbIn, (wchar_t*)pszOut, nCbOut);
  983. pszOut[nRet / sizeof(T)] = T('\0');
  984. break;
  985. default:
  986. ASSERT(false);
  987. return NULL;
  988. }
  989. return pszOut;
  990. }
  991. size_t slen(volatile const T *s)
  992. {
  993. volatile const T *p = s;
  994. while(*p) ++p;
  995. return p - s;
  996. }
  997. void zeroTerm(size_t at)
  998. {
  999. m_pszData[at] = (T)'\0';
  1000. }
  1001. bool UpdateShadowBuffer(int &nLocks)
  1002. {
  1003. Lock(nLocks);
  1004. bool bChanged = !!memcmp(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
  1005. if(bChanged)
  1006. {
  1007. zeroTerm(m_nCChBuffer - 1);
  1008. m_nCbString = slen(m_pszData) * sizeof(T);
  1009. memcpy(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
  1010. TRACE("Changed: %s\n", GetPath());
  1011. }
  1012. Unlock(nLocks);
  1013. return bChanged;
  1014. }
  1015. private:
  1016. const size_t m_nCChBuffer;
  1017. const size_t m_nCbBuffer;
  1018. size_t m_nCbString;
  1019. volatile T *m_pszData;
  1020. T *m_pszShadow;
  1021. };
  1022. /////////////////////////////////////////////////////////////////////////////
  1023. #define GET_BOOL_VAL(p, m) (!!(*p & m))
  1024. #define SET_BIT(p, m) (*p |= m)
  1025. #define CLR_BIT(p, m) (*p &= ~m)
  1026. #define STORE_BIT(p, m, b) (b) ? SET_BIT(p, m) : CLR_BIT(p, m)
  1027. class CMqttBitVariable : public CMqttVar
  1028. {
  1029. public:
  1030. CMqttBitVariable(CMqttBitVariable &&m) noexcept :
  1031. CMqttVar(std::move(m)),
  1032. m_mask(std::move(m.m_mask)),
  1033. m_pDataByte(std::move(m.m_pDataByte)),
  1034. m_pShadowByte(std::move(m.m_pShadowByte))
  1035. {
  1036. }
  1037. CMqttBitVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nOffset, int nBitNr, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  1038. CMqttVar(hShm, pszName, -1, nPubMask, nQos, bRetain, pParent),
  1039. m_mask(1 << nBitNr),
  1040. m_pDataByte((uint8_t*)pData + nOffset),
  1041. m_pShadowByte((uint8_t*)pShadow + nOffset)
  1042. {
  1043. ASSERT(m_pDataByte);
  1044. ASSERT(m_pShadowByte);
  1045. int nLocks = 0;
  1046. Lock(nLocks);
  1047. STORE_BIT(m_pShadowByte, m_mask, GET_BOOL_VAL(m_pDataByte, m_mask));
  1048. Unlock(nLocks);
  1049. }
  1050. public:
  1051. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1052. {
  1053. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  1054. ClearForcedPublishMask();
  1055. if(!PublishEnabled())
  1056. return;
  1057. if(!UpdateShadowBuffer(nLocks))
  1058. {
  1059. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  1060. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1061. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  1062. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1063. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  1064. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1065. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  1066. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1067. }
  1068. else
  1069. {
  1070. if(PublishEnabled(MQTT_VALUE_BINLE))
  1071. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1072. if(PublishEnabled(MQTT_VALUE_BINBE))
  1073. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1074. if(PublishEnabled(MQTT_VALUE_JSON))
  1075. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1076. if(PublishEnabled(MQTT_VALUE_PBUF))
  1077. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1078. }
  1079. }
  1080. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  1081. switch(nValFormat)
  1082. {
  1083. case MQTT_VALUE_BINLE:
  1084. case MQTT_VALUE_BINBE:
  1085. return SetShmValueBin(pMsg, nLocks);
  1086. case MQTT_VALUE_JSON:
  1087. return SetShmValueJson(pMsg, nLocks);
  1088. case MQTT_VALUE_PBUF:
  1089. return SetShmValuePBuf(pMsg, nLocks);
  1090. default:
  1091. break;
  1092. }
  1093. return false;
  1094. }
  1095. private:
  1096. bool SetShmValueBin(CMqttMessage *pMsg, int &nLocks)
  1097. {
  1098. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(bool)))
  1099. {
  1100. bool bVal = !!(*(bool*)pMsg->payload);
  1101. Lock(nLocks);
  1102. STORE_BIT(m_pDataByte, m_mask, bVal);
  1103. Unlock(nLocks);
  1104. return true;
  1105. }
  1106. return false;
  1107. }
  1108. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  1109. {
  1110. bool bRet = false;
  1111. if(pMsg)
  1112. {
  1113. bool bVal;
  1114. CJson_t jtRoot, jtVal;
  1115. std::string sErr;
  1116. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  1117. {
  1118. if(jtRoot.GetValue("value", jtVal))
  1119. {
  1120. const json_t *pjt = jtVal;
  1121. if(json_is_boolean(pjt))
  1122. {
  1123. bVal = json_boolean_value(pjt);
  1124. bRet = true;
  1125. }
  1126. else if(json_is_integer(pjt))
  1127. {
  1128. bVal = !!::json_integer_value(pjt);
  1129. bRet = true;
  1130. }
  1131. if(bRet)
  1132. {
  1133. Lock(nLocks);
  1134. STORE_BIT(m_pDataByte, m_mask, bVal);
  1135. Unlock(nLocks);
  1136. }
  1137. }
  1138. }
  1139. }
  1140. return bRet;
  1141. }
  1142. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  1143. {
  1144. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  1145. return false;
  1146. }
  1147. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1148. {
  1149. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  1150. bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
  1151. #if _RETURN_BIN_AS_STRING
  1152. std::string v = val ? "true" : "false";
  1153. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1154. #else // _RETURN_BIN_AS_STRING
  1155. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  1156. #endif // _RETURN_BIN_AS_STRING
  1157. rmq.Push(pMsg);
  1158. }
  1159. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1160. {
  1161. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  1162. bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
  1163. #if _RETURN_BIN_AS_STRING
  1164. std::string v = val ? "true" : "false";
  1165. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1166. #else // _RETURN_BIN_AS_STRING
  1167. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  1168. #endif // _RETURN_BIN_AS_STRING
  1169. rmq.Push(pMsg);
  1170. }
  1171. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1172. {
  1173. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  1174. CJson_t jtValue(json_object());
  1175. CJson_t jtPath(json_string(GetPath()));
  1176. CJson_t jtIndex(json_integer(GetIndex()));
  1177. CJson_t jtName(json_string(GetName()));
  1178. CJson_t jtVal(GetJsonValue());
  1179. json_object_set(jtValue, "path", jtPath);
  1180. json_object_set(jtValue, "index", jtIndex);
  1181. json_object_set(jtValue, "name", jtName);
  1182. json_object_set(jtValue, "value", jtVal);
  1183. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  1184. std::string v = pszJson;
  1185. free(pszJson);
  1186. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1187. rmq.Push(pMsg);
  1188. }
  1189. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1190. {
  1191. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  1192. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  1193. }
  1194. json_t* GetJsonValue(void) const
  1195. {
  1196. return json_boolean(GET_BOOL_VAL(m_pShadowByte, m_mask));
  1197. }
  1198. bool UpdateShadowBuffer(int &nLocks)
  1199. {
  1200. bool bVal;
  1201. Lock(nLocks);
  1202. bool bChanged = GET_BOOL_VAL(m_pShadowByte, m_mask) != (bVal = GET_BOOL_VAL(m_pDataByte, m_mask));
  1203. if(bChanged)
  1204. {
  1205. STORE_BIT(m_pShadowByte, m_mask, bVal);
  1206. TRACE("Changed: %s\n", GetPath());
  1207. }
  1208. Unlock(nLocks);
  1209. return bChanged;
  1210. }
  1211. private:
  1212. uint8_t m_mask;
  1213. volatile uint8_t *m_pDataByte;
  1214. uint8_t *m_pShadowByte;
  1215. };
  1216. /////////////////////////////////////////////////////////////////////////////
  1217. template <typename T, typename V>
  1218. class CMqttVarArray : public CMqttVar,
  1219. public std::vector<T>
  1220. {
  1221. public:
  1222. CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
  1223. : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
  1224. {
  1225. for(size_t i = 0; i < nElemCount; i++)
  1226. {
  1227. this->emplace_back(&((V*)pData)[i], &((V*)pShadow)[i], hShm, pszName, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
  1228. }
  1229. }
  1230. CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, size_t nCChData, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
  1231. : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
  1232. {
  1233. for(size_t i = 0; i < nElemCount; i++)
  1234. {
  1235. this->emplace_back(&((V*)pData)[i * nCChData], &((V*)pShadow)[i * nCChData], hShm, pszName, nCChData, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
  1236. }
  1237. }
  1238. public:
  1239. virtual void CreateMembersTable(CMqttVarTable &vt)
  1240. {
  1241. CMqttVar::CreateMembersTable(vt);
  1242. for(auto i = this->begin(); i != this->end(); ++i)
  1243. {
  1244. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1245. rVar.CreateMembersTable(vt);
  1246. }
  1247. }
  1248. virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1)
  1249. {
  1250. CMqttVar::InitPath(pParent, pszMemberName, nIndex);
  1251. int j = 0;
  1252. for(auto i = this->begin(); i != this->end(); ++i, ++j)
  1253. {
  1254. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1255. rVar.InitPath(pParent, pszMemberName, j);
  1256. }
  1257. }
  1258. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1259. {
  1260. Lock(nLocks);
  1261. for(auto i = this->begin(); i != this->end(); ++i)
  1262. {
  1263. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1264. rVar.CheckShmAndPublish(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1265. }
  1266. Unlock(nLocks);
  1267. }
  1268. virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt)
  1269. {
  1270. bool bRet = false;
  1271. for(auto i = this->begin(); i != this->end(); ++i)
  1272. {
  1273. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1274. bRet = rVar.EnablePublish(nMask, pvt) || bRet;
  1275. }
  1276. return bRet;
  1277. }
  1278. virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt)
  1279. {
  1280. bool bRet = false;
  1281. for(auto i = this->begin(); i != this->end(); ++i)
  1282. {
  1283. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1284. bRet = rVar.DisablePublish(nMask, pvt) || bRet;
  1285. }
  1286. return bRet;
  1287. }
  1288. virtual bool SetQoS(int nQos)
  1289. {
  1290. bool bRet = false;
  1291. for(auto i = this->begin(); i != this->end(); ++i)
  1292. {
  1293. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1294. bRet = rVar.SetQoS(nQos) || bRet;
  1295. }
  1296. return bRet;
  1297. }
  1298. virtual bool SetRetained(bool bRetain)
  1299. {
  1300. bool bRet = false;
  1301. for(auto i = this->begin(); i != this->end(); ++i)
  1302. {
  1303. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1304. bRet = rVar.SetRetained(bRetain) || bRet;
  1305. }
  1306. return bRet;
  1307. }
  1308. virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
  1309. {
  1310. for(auto i = this->begin(); i != this->end(); ++i)
  1311. {
  1312. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1313. rVar.RemoveRetained(pszTopicDevID, pszTopicShmID, rmq, bForce);
  1314. }
  1315. }
  1316. };
  1317. /////////////////////////////////////////////////////////////////////////////
  1318. #endif // __cplusplus
  1319. #endif // !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)