mqttvar.h 38 KB


  1. // mqttvar.h :
  2. //
  3. #if !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)
  4. #define AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_
  5. #include <stdint.h>
  6. #include <string>
  7. #include <vector>
  8. #include <map>
  9. #include <forward_list>
  10. #include <utility>
  11. #include <endian.h>
  12. #include <gfa/gfaipc.h>
  13. #include <typeinfo>
  14. #include <typeindex>
  15. #ifndef _LIBBUILD
  16. #include <gfa/svc/common/strutil.h>
  17. #include <gfa/svc/common/conv.h>
  18. #include <gfa/svc/common/debug.h>
  19. #include <gfa/svc/mqttcl/mqttmsg.h>
  20. #include <gfa/svc/mqttcl/mqttjson.h>
  21. #include <gfa/svc/mqttcl/mqttdbg.h>
  22. #else // _LIBBUILD
  23. #include "common/strutil.h"
  24. #include "common/conv.h"
  25. #include "common/debug.h"
  26. #include "mqttmsg.h"
  27. #include "mqttjson.h"
  28. #include "mqttdbg.h"
  29. #endif // _LIBBUILD
  30. /////////////////////////////////////////////////////////////////////////////
  31. #define MQTT_VALUE_BINLE 0x00000001
  32. #define MQTT_VALUE_BINBE 0x00000002
  33. #define MQTT_VALUE_JSON 0x00000004
  34. #define MQTT_VALUE_PBUF 0x00000008
  35. #define MQTT_VALUE_ALL_FORMATS (MQTT_VALUE_BINLE | MQTT_VALUE_BINBE | MQTT_VALUE_JSON | MQTT_VALUE_PBUF)
  36. #define MQTT_TOPIC_CMD_VALUE "VALUE"
  37. #define MQTT_TOPIC_VALUE_BINLE "BINLE"
  38. #define MQTT_TOPIC_VALUE_BINBE "BINBE"
  39. #define MQTT_TOPIC_VALUE_JSON "JSON"
  40. #define MQTT_TOPIC_VALUE_PBUF "PBUF"
  41. #ifdef _DEBUG
  42. #define MQTT_JSON_OUTPUT_FLAGS JSON_MAX_INDENT
  43. #else // _DEBUG
  44. #define MQTT_JSON_OUTPUT_FLAGS JSON_COMPACT
  45. #endif // _DEBUG
  46. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  47. #define _UTF_16 "UTF-16LE//"
  48. #define _UTF_32 "UTF-32LE//"
  49. #define _UNICODE "WCHAR_T//"
  50. #elif __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  51. #define _UTF_16 "UTF-16BE//"
  52. #define _UTF_32 "UTF-32BE//"
  53. #define _UNICODE "WCHAR_T//"
  54. #else // __BYTE_ORDER__
  55. #error Invalid or unsupported byte order!
  56. #endif // __BYTE_ORDER__
  57. #define MQTTCL_MIN_QOS 0
  58. #define MQTTCL_MAX_QOS 2
  59. #ifdef __cplusplus
  60. /////////////////////////////////////////////////////////////////////////////
  61. template <typename T>
  62. static void _swap_val(T &v)
  63. {
  64. // should static assert that T is a POD
  65. if(sizeof(T) > 1)
  66. {
  67. char &raw = reinterpret_cast<char&>(v);
  68. std::reverse(&raw, &raw + sizeof(T));
  69. }
  70. }
  71. template <typename T>
  72. static const T* _copy_swap_string_chars(const T *s, T *v, size_t cch)
  73. {
  74. if(sizeof(T) > 1 && cch > 0)
  75. {
  76. for(T *pv = v; cch > 0; --cch, ++pv, ++s)
  77. {
  78. *pv = *s;
  79. if(*pv)
  80. _swap_val(*pv);
  81. else
  82. break;
  83. }
  84. return v;
  85. }
  86. else
  87. {
  88. memcpy(v, s, cch);
  89. return v;
  90. }
  91. }
  92. /////////////////////////////////////////////////////////////////////////////
  93. // mqttvar.h - Declarations:
  94. class CMqttVar;
  95. class CMqttVarTable
  96. {
  97. public:
  98. typedef bool (*_PFNCMP)(const char*, const char*);
  99. public:
  100. CMqttVarTable(void);
  101. virtual ~CMqttVarTable(void);
  102. void AddVar(CMqttVar *pv);
  103. CMqttVar* Find(const char *key) const;
  104. inline size_t size(void) const {
  105. return m_map.size();}
  106. bool AddToPubTable(CMqttVar *pv);
  107. bool RemoveFromPubTable(CMqttVar *pv);
  108. void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks);
  109. void DumpPubEnabled(uint32_t nPubMask = MQTT_VALUE_ALL_FORMATS);
  110. void TracePubVar(const char *pszPath, uint32_t nMask, int nQos, bool bRetained);
  111. private:
  112. std::map<const char*, CMqttVar*, _PFNCMP> m_map;
  113. std::map<const char*, CMqttVar*, _PFNCMP> m_pub;
  114. };
  115. /////////////////////////////////////////////////////////////////////////////
  116. class CMqttVar
  117. {
  118. public:
  119. CMqttVar(CMqttVar &&m) noexcept :
  120. m_hShm(std::move(m.m_hShm)),
  121. m_pParent(std::move(m.m_pParent)),
  122. m_nPubMask(std::move(m.m_nPubMask)),
  123. m_nPubMaskForcedOnce(m_nPubMask),
  124. m_path(std::move(m.m_path)),
  125. m_pszPath(m_path.c_str()),
  126. m_nCbVarpath(std::move(m.m_nCbVarpath)),
  127. m_name(std::move(m.m_name)),
  128. m_nIndex(std::move(m.m_nIndex)),
  129. m_nQos(std::move(m.m_nQos)),
  130. m_bRetain(std::move(m.m_bRetain)),
  131. m_nRetainedPubMask(std::move(m.m_nRetainedPubMask))
  132. {
  133. }
  134. CMqttVar(HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  135. m_hShm(hShm),
  136. m_pParent(pParent),
  137. m_nPubMask(nPubMask),
  138. m_nPubMaskForcedOnce(m_nPubMask),
  139. m_pszPath(NULL),
  140. m_nCbVarpath(0),
  141. m_name(pszName ? pszName : ""),
  142. m_nIndex(nIndex),
  143. m_nQos(nQos),
  144. m_bRetain(bRetain),
  145. m_nRetainedPubMask(0)
  146. {
  147. }
  148. virtual ~CMqttVar(void)
  149. {
  150. }
  151. public:
  152. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks) = 0;
  153. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  154. return false;
  155. }
  156. virtual void CreateMembersTable(CMqttVarTable &vt) {
  157. vt.AddVar(this);}
  158. virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt) {
  159. if((m_nPubMask & nMask) != nMask)
  160. {
  161. #if _DUMP_CONTROL_CHANGED
  162. uint32_t nEnable = ((nMask ^ m_nPubMask) & nMask);
  163. #endif // _DUMP_CONTROL_CHANGED
  164. m_nPubMaskForcedOnce |= ((nMask ^ m_nPubMask) & nMask);
  165. m_nPubMask |= nMask;
  166. if(pvt)
  167. {
  168. pvt->AddToPubTable(this);
  169. #if _TRACK_TIMES
  170. g_nDbgCounter1++;
  171. #endif // _TRACK_TIMES
  172. #if _DUMP_CONTROL_CHANGED
  173. TraceFormatChange(nEnable, true);
  174. #endif // _DUMP_CONTROL_CHANGED
  175. return true;
  176. }
  177. }
  178. return false;
  179. }
  180. virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt) {
  181. if(m_nPubMask & nMask)
  182. {
  183. #if _DUMP_CONTROL_CHANGED
  184. uint32_t nDisable = (((~nMask) ^ m_nPubMask) & nMask);
  185. #endif // _DUMP_CONTROL_CHANGED
  186. m_nPubMask &= ~nMask;
  187. m_nPubMaskForcedOnce &= ~nMask;
  188. if(pvt)
  189. {
  190. pvt->RemoveFromPubTable(this);
  191. #if _DUMP_CONTROL_CHANGED
  192. TraceFormatChange(nDisable, false);
  193. #endif // _DUMP_CONTROL_CHANGED
  194. return true;
  195. }
  196. }
  197. return false;
  198. }
  199. bool PublishEnabled(uint32_t nMask = MQTT_VALUE_ALL_FORMATS) const {
  200. return !!(m_nPubMask & nMask);}
  201. uint32_t GetPublishMask(void) const {
  202. return m_nPubMask;}
  203. uint32_t GetForcedPublishMask(void) const {
  204. return m_nPubMaskForcedOnce;}
  205. void ClearForcedPublishMask(void) {
  206. m_nPubMaskForcedOnce = 0;}
  207. virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1) {
  208. CreatePath(pParent, pszMemberName, nIndex, m_path);
  209. m_pszPath = m_path.c_str();
  210. m_nCbVarpath = m_path.length();
  211. }
  212. virtual const char* GetPath(void) const {
  213. return m_pszPath;}
  214. int GetQoS(void) const {
  215. return m_nQos;}
  216. virtual bool SetQoS(int nQos) {
  217. if(nQos < MQTTCL_MIN_QOS)
  218. nQos = MQTTCL_MIN_QOS;
  219. else if(nQos > MQTTCL_MAX_QOS)
  220. nQos = MQTTCL_MAX_QOS;
  221. if(m_nQos != nQos) {
  222. TraceQOSChange(m_nQos, nQos);
  223. m_nQos = nQos;
  224. return true;
  225. }
  226. return false;
  227. }
  228. bool GetRetained(void) const {
  229. return m_bRetain;}
  230. virtual bool SetRetained(bool bRetain) {
  231. if(m_bRetain != bRetain) {
  232. m_bRetain = bRetain;
  233. TraceRetainChange(bRetain);
  234. return true;
  235. }
  236. else
  237. return false;
  238. }
  239. void SetRetainedPubMask(uint32_t nMask) {
  240. m_nRetainedPubMask |= nMask;
  241. }
  242. virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
  243. {
  244. if(!m_bRetain)
  245. {
  246. std::string s;
  247. CMqttMessage *pMsg;
  248. if((m_nRetainedPubMask & MQTT_VALUE_BINLE) || bForce)
  249. {
  250. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  251. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  252. rmq.Push(pMsg);
  253. m_nRetainedPubMask &= ~MQTT_VALUE_BINLE;
  254. }
  255. if((m_nRetainedPubMask & MQTT_VALUE_BINBE) || bForce)
  256. {
  257. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  258. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  259. rmq.Push(pMsg);
  260. m_nRetainedPubMask &= ~MQTT_VALUE_BINBE;
  261. }
  262. if((m_nRetainedPubMask & MQTT_VALUE_JSON) || bForce)
  263. {
  264. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  265. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  266. rmq.Push(pMsg);
  267. m_nRetainedPubMask &= ~MQTT_VALUE_JSON;
  268. }
  269. if((m_nRetainedPubMask & MQTT_VALUE_PBUF) || bForce)
  270. {
  271. s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  272. pMsg = CMqttMessage::CreateRemoveRetainedMessage(s.c_str(), GetQoS());
  273. rmq.Push(pMsg);
  274. m_nRetainedPubMask &= ~MQTT_VALUE_PBUF;
  275. }
  276. }
  277. }
  278. protected:
  279. static void CreatePath(CMqttVar *pParent, const char *pszMemberName, int nIndex, std::string &rPath)
  280. {
  281. char szIndex[32] = {0};
  282. if(!pszMemberName || !*pszMemberName)
  283. pszMemberName = "/";
  284. if(nIndex >= 0)
  285. {
  286. sprintf(szIndex, "/%d", nIndex);
  287. }
  288. if(pParent)
  289. {
  290. rPath = pParent->GetPath();
  291. auto len = rPath.size();
  292. if(len > 0)
  293. {
  294. auto rend = rPath.rbegin();
  295. if(*rend != '/')
  296. rPath += "/";
  297. }
  298. rPath += pszMemberName;
  299. rPath += szIndex;
  300. }
  301. else
  302. {
  303. rPath = pszMemberName;
  304. rPath += szIndex;
  305. }
  306. }
  307. std::string CreateTopic(const char *pszTopicDevID, const char *pszTopicShmID, const char *pszValueFormat, const char *pszTopicCmd)
  308. {
  309. return ::formatString("%s/%s/%s/%s%s", pszTopicDevID, pszTopicShmID, pszValueFormat, pszTopicCmd, m_pszPath);
  310. }
  311. void Lock(int &nLocks)
  312. {
  313. if(!nLocks)
  314. {
  315. ::GfaIpcLockSHM(m_hShm);
  316. ++nLocks;
  317. }
  318. }
  319. void Unlock(int &nLocks)
  320. {
  321. if(nLocks)
  322. {
  323. --nLocks;
  324. ::GfaIpcUnlockSHM(m_hShm);
  325. }
  326. }
  327. const char* GetName(void) const {
  328. return m_name.c_str();}
  329. int GetIndex(void) const {
  330. return m_nIndex;}
  331. private:
  332. void TraceFormatChange(uint32_t nMask, bool bOn)
  333. {
  334. #ifdef _DUMP_CONTROL_CHANGED
  335. if(m_pszPath && nMask)
  336. {
  337. int nCount = 0;
  338. TRACE("%s ==> %s ", m_pszPath, bOn ? "ON" : "OFF");
  339. if(nMask & MQTT_VALUE_BINLE)
  340. {
  341. TRACE("%s", MQTT_TOPIC_VALUE_BINLE);
  342. ++nCount;
  343. }
  344. if(nMask & MQTT_VALUE_BINBE)
  345. {
  346. if(nCount)
  347. TRACE(", ");
  348. TRACE("%s", MQTT_TOPIC_VALUE_BINBE);
  349. ++nCount;
  350. }
  351. if(nMask & MQTT_VALUE_JSON)
  352. {
  353. if(nCount)
  354. TRACE(", ");
  355. TRACE("%s", MQTT_TOPIC_VALUE_JSON);
  356. ++nCount;
  357. }
  358. if(nMask & MQTT_VALUE_PBUF)
  359. {
  360. if(nCount)
  361. TRACE(", ");
  362. TRACE("%s", MQTT_TOPIC_VALUE_PBUF);
  363. ++nCount;
  364. }
  365. TRACE("\n");
  366. }
  367. #endif // _DUMP_CONTROL_CHANGED
  368. }
  369. void TraceQOSChange(int nQosOld, int nQosNew)
  370. {
  371. #ifdef _DUMP_CONTROL_CHANGED
  372. if(m_pszPath)
  373. {
  374. TRACE("%s ==> QOS: %d -> %d\n", m_pszPath, nQosOld, nQosNew);
  375. }
  376. #endif // _DUMP_CONTROL_CHANGED
  377. }
  378. void TraceRetainChange(bool bRetained)
  379. {
  380. #ifdef _DUMP_CONTROL_CHANGED
  381. if(m_pszPath)
  382. {
  383. TRACE("%s ==> Retain: %s\n", m_pszPath, bRetained ? "true" : "false");
  384. }
  385. #endif // _DUMP_CONTROL_CHANGED
  386. }
  387. private:
  388. HSHM m_hShm;
  389. CMqttVar *m_pParent;
  390. uint32_t m_nPubMask;
  391. uint32_t m_nPubMaskForcedOnce;
  392. std::string m_path;
  393. const char *m_pszPath;
  394. size_t m_nCbVarpath;
  395. std::string m_name;
  396. int m_nIndex;
  397. int m_nQos;
  398. bool m_bRetain;
  399. uint32_t m_nRetainedPubMask;
  400. };
  401. /////////////////////////////////////////////////////////////////////////////
  402. template <typename T>
  403. class CMqttVariable : public CMqttVar
  404. {
  405. public:
  406. CMqttVariable(CMqttVariable &&m) noexcept :
  407. CMqttVar(std::move(m)),
  408. m_pData(std::move(m.m_pData)),
  409. m_pShadow(std::move(m.m_pShadow)),
  410. m_bIsBool(std::move(m.m_bIsBool))
  411. {
  412. }
  413. CMqttVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  414. CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
  415. m_pData((volatile T*)pData),
  416. m_pShadow((T*)pShadow),
  417. m_bIsBool(std::type_index(typeid(T)) == std::type_index(typeid(bool)))
  418. {
  419. ASSERT(m_pData);
  420. ASSERT(m_pShadow);
  421. ASSERT(hShm);
  422. int nLocks = 0;
  423. Lock(nLocks);
  424. *m_pShadow = *m_pData;
  425. Unlock(nLocks);
  426. }
  427. /////////////////////////////////////////////////////////////////////////
  428. public:
  429. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  430. {
  431. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  432. ClearForcedPublishMask();
  433. if(!PublishEnabled())
  434. return;
  435. if(!UpdateShadowBuffer(nLocks))
  436. {
  437. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  438. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  439. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  440. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  441. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  442. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  443. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  444. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  445. }
  446. else
  447. {
  448. if(PublishEnabled(MQTT_VALUE_BINLE))
  449. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  450. if(PublishEnabled(MQTT_VALUE_BINBE))
  451. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  452. if(PublishEnabled(MQTT_VALUE_JSON))
  453. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  454. if(PublishEnabled(MQTT_VALUE_PBUF))
  455. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  456. }
  457. }
  458. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  459. switch(nValFormat)
  460. {
  461. case MQTT_VALUE_BINLE:
  462. return SetShmValueLE(pMsg, nLocks);
  463. case MQTT_VALUE_BINBE:
  464. return SetShmValueBE(pMsg, nLocks);
  465. case MQTT_VALUE_JSON:
  466. return SetShmValueJson(pMsg, nLocks);
  467. case MQTT_VALUE_PBUF:
  468. return SetShmValuePBuf(pMsg, nLocks);
  469. default:
  470. break;
  471. }
  472. return false;
  473. }
  474. /////////////////////////////////////////////////////////////////////////
  475. private:
  476. bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
  477. {
  478. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
  479. {
  480. T val = *(T*)pMsg->payload;
  481. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  482. _swap_val(val);
  483. #endif // __BYTE_ORDER__
  484. Lock(nLocks);
  485. *m_pData = val;
  486. Unlock(nLocks);
  487. return true;
  488. }
  489. return false;
  490. }
  491. bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
  492. {
  493. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(T)))
  494. {
  495. T val = *(T*)pMsg->payload;
  496. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  497. _swap_val(val);
  498. #endif // __BYTE_ORDER__
  499. Lock(nLocks);
  500. *m_pData = val;
  501. Unlock(nLocks);
  502. return true;
  503. }
  504. return false;
  505. }
  506. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  507. {
  508. bool bRet = false;
  509. if(pMsg)
  510. {
  511. T val;
  512. CJson_t jtRoot, jtVal;
  513. std::string sErr;
  514. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  515. {
  516. if(jtRoot.GetValue("value", jtVal))
  517. {
  518. const json_t *pjt = jtVal;
  519. if(m_bIsBool)
  520. {
  521. if(json_is_boolean(pjt))
  522. {
  523. val = (T)json_boolean_value(pjt);
  524. bRet = true;
  525. }
  526. else if(json_is_integer(pjt))
  527. {
  528. val = !!::json_integer_value(pjt);
  529. bRet = true;
  530. }
  531. }
  532. else if(std::is_integral<T>::value)
  533. {
  534. if(json_is_integer(pjt))
  535. {
  536. val = (T)::json_integer_value(pjt);
  537. bRet = true;
  538. }
  539. else if(json_is_boolean(pjt))
  540. {
  541. val = (T)(json_boolean_value(pjt) ? 1 : 0);
  542. bRet = true;
  543. }
  544. }
  545. else if(std::is_floating_point<T>::value)
  546. {
  547. if(json_is_real(pjt))
  548. {
  549. val = (T)::json_real_value(pjt);
  550. bRet = true;
  551. }
  552. else if(json_is_integer(pjt))
  553. {
  554. val = (T)::json_integer_value(pjt);
  555. bRet = true;
  556. }
  557. else if(json_is_boolean(pjt))
  558. {
  559. val = (T)(json_boolean_value(pjt) ? 1 : 0);
  560. bRet = true;
  561. }
  562. }
  563. if(bRet)
  564. {
  565. Lock(nLocks);
  566. *m_pData = val;
  567. Unlock(nLocks);
  568. }
  569. }
  570. }
  571. }
  572. return bRet;
  573. }
  574. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  575. {
  576. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  577. return false;
  578. }
  579. /////////////////////////////////////////////////////////////////////////
  580. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  581. {
  582. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  583. #if _RETURN_BIN_AS_STRING
  584. std::string v = toString(*m_pShadow);
  585. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  586. #else // _RETURN_BIN_AS_STRING
  587. T val = *m_pShadow;
  588. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  589. _swap_val(val);
  590. #endif // __BYTE_ORDER__
  591. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  592. #endif // _RETURN_BIN_AS_STRING
  593. rmq.Push(pMsg);
  594. if(GetRetained())
  595. SetRetainedPubMask(MQTT_VALUE_BINLE);
  596. }
  597. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  598. {
  599. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  600. #if _RETURN_BIN_AS_STRING
  601. std::string v = toString(*m_pShadow);
  602. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  603. #else // _RETURN_BIN_AS_STRING
  604. T val = *m_pShadow;
  605. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  606. _swap_val(val);
  607. #endif // __BYTE_ORDER__
  608. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  609. #endif // _RETURN_BIN_AS_STRING
  610. rmq.Push(pMsg);
  611. if(GetRetained())
  612. SetRetainedPubMask(MQTT_VALUE_BINBE);
  613. }
  614. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  615. {
  616. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  617. CJson_t jtValue(json_object());
  618. CJson_t jtPath(json_string(GetPath()));
  619. CJson_t jtIndex(json_integer(GetIndex()));
  620. CJson_t jtName(json_string(GetName()));
  621. CJson_t jtVal(GetJsonValue());
  622. json_object_set(jtValue, "path", jtPath);
  623. json_object_set(jtValue, "index", jtIndex);
  624. json_object_set(jtValue, "name", jtName);
  625. json_object_set(jtValue, "value", jtVal);
  626. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  627. std::string v = pszJson;
  628. free(pszJson);
  629. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  630. rmq.Push(pMsg);
  631. if(GetRetained())
  632. SetRetainedPubMask(MQTT_VALUE_JSON);
  633. }
  634. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  635. {
  636. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  637. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  638. }
  639. std::string toString(T v)
  640. {
  641. if(std::is_integral<T>::value)
  642. {
  643. if(std::is_signed<T>::value)
  644. {
  645. long long r = (long long)v;
  646. return formatString("%lld", r);
  647. }
  648. else if(std::is_unsigned<T>::value)
  649. {
  650. unsigned long long r = (unsigned long long)v;
  651. return formatString("%llu", r);
  652. }
  653. }
  654. else if(std::is_floating_point<T>::value)
  655. {
  656. double r = (double)v;
  657. return formatString("%.20g", r);
  658. }
  659. return "";
  660. }
  661. json_t* GetJsonValue(void) const
  662. {
  663. if(std::is_integral<T>::value)
  664. return m_bIsBool ? json_boolean(!!*m_pShadow) : ::json_integer((json_int_t)*m_pShadow);
  665. else if(std::is_floating_point<T>::value)
  666. return ::json_real((double)*m_pShadow);
  667. ASSERT(false);
  668. return NULL;
  669. }
  670. bool UpdateShadowBuffer(int &nLocks)
  671. {
  672. Lock(nLocks);
  673. bool bChanged = *m_pShadow != *m_pData;
  674. if(bChanged)
  675. {
  676. *m_pShadow = *m_pData;
  677. TRACE("Changed: %s\n", GetPath());
  678. }
  679. Unlock(nLocks);
  680. return bChanged;
  681. }
  682. private:
  683. volatile T *m_pData;
  684. T *m_pShadow;
  685. bool m_bIsBool;
  686. };
  687. /////////////////////////////////////////////////////////////////////////////
  688. template <typename T, int ST>
  689. class CMqttStringVariable : public CMqttVar
  690. {
  691. public:
  692. typedef enum
  693. {
  694. VT_Invalid, // 0
  695. VT_Latin1, // 1
  696. VT_UTF_8, // 2
  697. VT_UTF_16, // 3
  698. VT_UTF_32, // 4
  699. VT_Unicode, // 5
  700. VT_Last
  701. }VT;
  702. public:
  703. CMqttStringVariable(CMqttStringVariable &&m) noexcept :
  704. CMqttVar(std::move(m)),
  705. m_nCChBuffer(std::move(m.m_nCChBuffer)),
  706. m_nCbBuffer(std::move(m.m_nCbBuffer)),
  707. m_nCbString(std::move(m.m_nCbString)),
  708. m_pszData(std::move(m.m_pszData)),
  709. m_pszShadow(std::move(m.m_pszShadow))
  710. {
  711. }
  712. CMqttStringVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nCChBuffer, int nIndex, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  713. CMqttVar(hShm, pszName, nIndex, nPubMask, nQos, bRetain, pParent),
  714. m_nCChBuffer(nCChBuffer),
  715. m_nCbBuffer(nCChBuffer * sizeof(T)),
  716. m_nCbString(0),
  717. m_pszData((T*)pData),
  718. m_pszShadow((T*)pShadow)
  719. {
  720. ASSERT(m_pszData);
  721. ASSERT(m_pszShadow);
  722. ASSERT(m_nCChBuffer > 0);
  723. int nLocks = 0;
  724. Lock(nLocks);
  725. zeroTerm(m_nCChBuffer - 1);
  726. m_nCbString = slen(m_pszData) * sizeof(T);
  727. memcpy(m_pszShadow, (const void*)m_pszData, m_nCbBuffer);
  728. Unlock(nLocks);
  729. }
  730. public:
  731. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  732. {
  733. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  734. ClearForcedPublishMask();
  735. if(!PublishEnabled())
  736. return;
  737. if(!UpdateShadowBuffer(nLocks))
  738. {
  739. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  740. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  741. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  742. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  743. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  744. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  745. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  746. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  747. }
  748. else
  749. {
  750. if(PublishEnabled(MQTT_VALUE_BINLE))
  751. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  752. if(PublishEnabled(MQTT_VALUE_BINBE))
  753. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  754. if(PublishEnabled(MQTT_VALUE_JSON))
  755. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  756. if(PublishEnabled(MQTT_VALUE_PBUF))
  757. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  758. }
  759. }
  760. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  761. switch(nValFormat)
  762. {
  763. case MQTT_VALUE_BINLE:
  764. return SetShmValueLE(pMsg, nLocks);
  765. case MQTT_VALUE_BINBE:
  766. return SetShmValueBE(pMsg, nLocks);
  767. case MQTT_VALUE_JSON:
  768. return SetShmValueJson(pMsg, nLocks);
  769. case MQTT_VALUE_PBUF:
  770. return SetShmValuePBuf(pMsg, nLocks);
  771. default:
  772. break;
  773. }
  774. return false;
  775. }
  776. private:
  777. bool SetShmValueLE(CMqttMessage *pMsg, int &nLocks)
  778. {
  779. if(pMsg->payload && pMsg->payloadlen > 0)
  780. {
  781. if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
  782. {
  783. Lock(nLocks);
  784. m_nCbString = (size_t)pMsg->payloadlen;
  785. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  786. _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
  787. #else // __BYTE_ORDER__
  788. memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
  789. #endif // __BYTE_ORDER__
  790. zeroTerm(m_nCbString / sizeof(T));
  791. Unlock(nLocks);
  792. return true;
  793. }
  794. }
  795. else
  796. {
  797. Lock(nLocks);
  798. *m_pszData = '\0';
  799. m_nCbString = 0;
  800. Unlock(nLocks);
  801. return true;
  802. }
  803. return false;
  804. }
  805. bool SetShmValueBE(CMqttMessage *pMsg, int &nLocks)
  806. {
  807. if(pMsg->payload && pMsg->payloadlen > 0)
  808. {
  809. if((size_t)pMsg->payloadlen / sizeof(T) < m_nCChBuffer)
  810. {
  811. Lock(nLocks);
  812. m_nCbString = (size_t)pMsg->payloadlen;
  813. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  814. _copy_swap_string_chars((const T*)pMsg->payload, (T*)m_pszData, m_nCbString / sizeof(T));
  815. #else // __BYTE_ORDER__
  816. memcpy((void*)m_pszData, pMsg->payload, m_nCbString);
  817. #endif // __BYTE_ORDER__
  818. zeroTerm(m_nCbString / sizeof(T));
  819. Unlock(nLocks);
  820. return true;
  821. }
  822. }
  823. else
  824. {
  825. Lock(nLocks);
  826. *m_pszData = '\0';
  827. m_nCbString = 0;
  828. Unlock(nLocks);
  829. return true;
  830. }
  831. return false;
  832. }
  833. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  834. {
  835. if(pMsg)
  836. {
  837. CJson_t jtRoot, jtVal;
  838. std::string sErr;
  839. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  840. {
  841. if(jtRoot.GetValue("value", jtVal))
  842. {
  843. const json_t *pjt = jtVal;
  844. if(json_is_string(pjt))
  845. {
  846. std::string s(::json_string_value(jtVal));
  847. if(s.length() > 0)
  848. {
  849. Lock(nLocks);
  850. if(fromUTF8(s.c_str(), s.length(), (T*)m_pszData, m_nCbBuffer))
  851. m_nCbString = slen(m_pszData) * sizeof(T);
  852. Unlock(nLocks);
  853. }
  854. else
  855. {
  856. Lock(nLocks);
  857. *m_pszData = '\0';
  858. m_nCbString = 0;
  859. Unlock(nLocks);
  860. }
  861. return true;
  862. }
  863. }
  864. }
  865. }
  866. return false;
  867. }
  868. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  869. {
  870. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  871. return false;
  872. }
  873. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  874. {
  875. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  876. #if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
  877. T szBuf[m_nCChBuffer];
  878. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
  879. #else // __BYTE_ORDER__
  880. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
  881. #endif // __BYTE_ORDER__
  882. rmq.Push(pMsg);
  883. }
  884. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  885. {
  886. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  887. #if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
  888. T szBuf[m_nCChBuffer];
  889. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), _copy_swap_string_chars(m_pszShadow, szBuf, m_nCChBuffer), m_nCbString, GetQoS(), GetRetained());
  890. #else // __BYTE_ORDER__
  891. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), m_pszShadow, m_nCbString, GetQoS(), GetRetained());
  892. #endif // __BYTE_ORDER__
  893. rmq.Push(pMsg);
  894. }
  895. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  896. {
  897. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  898. CJson_t jtValue(json_object());
  899. CJson_t jtPath(json_string(GetPath()));
  900. CJson_t jtIndex(json_integer(GetIndex()));
  901. CJson_t jtName(json_string(GetName()));
  902. CJson_t jtVal(GetJsonValue());
  903. json_object_set(jtValue, "path", jtPath);
  904. json_object_set(jtValue, "index", jtIndex);
  905. json_object_set(jtValue, "name", jtName);
  906. json_object_set(jtValue, "value", jtVal);
  907. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  908. std::string v = pszJson;
  909. free(pszJson);
  910. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  911. rmq.Push(pMsg);
  912. }
  913. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  914. {
  915. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  916. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  917. }
  918. json_t* GetJsonValue(void) const
  919. {
  920. char szBuf[m_nCbBuffer * 4];
  921. const char *pszUtf8 = toUTF8(m_pszShadow, m_nCbString, szBuf, sizeof(szBuf));
  922. if(pszUtf8)
  923. return json_string(pszUtf8);
  924. return NULL;
  925. }
  926. const char* toUTF8(const T *pszIn, size_t nCbIn, char *pszOut, size_t nCbOut) const
  927. {
  928. switch(ST)
  929. {
  930. case VT_Latin1:
  931. Latin1ToUtf8(pszIn, nCbIn, pszOut, nCbOut);
  932. break;
  933. case VT_UTF_8:
  934. strncpy(pszOut, (const char*)pszIn, nCbOut - 1);
  935. pszOut[nCbOut - 1] = '\0';
  936. break;
  937. case VT_UTF_16:
  938. EncToUtf8(_UTF_16, pszIn, nCbIn, pszOut, nCbOut);
  939. break;
  940. case VT_UTF_32:
  941. EncToUtf8(_UTF_32, pszIn, nCbIn, pszOut, nCbOut);
  942. break;
  943. case VT_Unicode:
  944. EncToUtf8(_UNICODE, pszIn, nCbIn, pszOut, nCbOut);
  945. break;
  946. default:
  947. ASSERT(false);
  948. return NULL;
  949. }
  950. return pszOut;
  951. }
  952. const T* fromUTF8(const char *pszIn, size_t nCbIn, T *pszOut, size_t nCbOut) const
  953. {
  954. size_t nRet;
  955. switch(ST)
  956. {
  957. case VT_Latin1:
  958. nRet = Utf8ToLatin1(pszIn, nCbIn, pszOut, nCbOut);
  959. pszOut[nRet / sizeof(T)] = T('\0');
  960. break;
  961. case VT_UTF_8:
  962. strncpy((char*)pszOut, pszIn, nCbOut - 1);
  963. pszOut[nCbOut - 1] = T('\0');
  964. break;
  965. case VT_UTF_16:
  966. nRet = Utf8ToUtf16(pszIn, nCbIn, (char16_t*)pszOut, nCbOut);
  967. pszOut[nRet / sizeof(T)] = T('\0');
  968. break;
  969. case VT_UTF_32:
  970. nRet = Utf8ToUtf32(pszIn, nCbIn, (char32_t*)pszOut, nCbOut);
  971. pszOut[nRet / sizeof(T)] = T('\0');
  972. break;
  973. case VT_Unicode:
  974. nRet = Utf8ToWcs(pszIn, nCbIn, (wchar_t*)pszOut, nCbOut);
  975. pszOut[nRet / sizeof(T)] = T('\0');
  976. break;
  977. default:
  978. ASSERT(false);
  979. return NULL;
  980. }
  981. return pszOut;
  982. }
  983. size_t slen(volatile const T *s)
  984. {
  985. volatile const T *p = s;
  986. while(*p) ++p;
  987. return p - s;
  988. }
  989. void zeroTerm(size_t at)
  990. {
  991. m_pszData[at] = (T)'\0';
  992. }
  993. bool UpdateShadowBuffer(int &nLocks)
  994. {
  995. Lock(nLocks);
  996. bool bChanged = !!memcmp(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
  997. if(bChanged)
  998. {
  999. zeroTerm(m_nCChBuffer - 1);
  1000. m_nCbString = slen(m_pszData) * sizeof(T);
  1001. memcpy(m_pszShadow, (const void*)m_pszData, m_nCbString + sizeof(T));
  1002. TRACE("Changed: %s\n", GetPath());
  1003. }
  1004. Unlock(nLocks);
  1005. return bChanged;
  1006. }
  1007. private:
  1008. const size_t m_nCChBuffer;
  1009. const size_t m_nCbBuffer;
  1010. size_t m_nCbString;
  1011. volatile T *m_pszData;
  1012. T *m_pszShadow;
  1013. };
  1014. /////////////////////////////////////////////////////////////////////////////
  1015. #define GET_BOOL_VAL(p, m) (!!(*p & m))
  1016. #define SET_BIT(p, m) (*p |= m)
  1017. #define CLR_BIT(p, m) (*p &= ~m)
  1018. #define STORE_BIT(p, m, b) (b) ? SET_BIT(p, m) : CLR_BIT(p, m)
  1019. class CMqttBitVariable : public CMqttVar
  1020. {
  1021. public:
  1022. CMqttBitVariable(CMqttBitVariable &&m) noexcept :
  1023. CMqttVar(std::move(m)),
  1024. m_mask(std::move(m.m_mask)),
  1025. m_pDataByte(std::move(m.m_pDataByte)),
  1026. m_pShadowByte(std::move(m.m_pShadowByte))
  1027. {
  1028. }
  1029. CMqttBitVariable(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nOffset, int nBitNr, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent) :
  1030. CMqttVar(hShm, pszName, -1, nPubMask, nQos, bRetain, pParent),
  1031. m_mask(1 << nBitNr),
  1032. m_pDataByte((uint8_t*)pData + nOffset),
  1033. m_pShadowByte((uint8_t*)pShadow + nOffset)
  1034. {
  1035. ASSERT(m_pDataByte);
  1036. ASSERT(m_pShadowByte);
  1037. int nLocks = 0;
  1038. Lock(nLocks);
  1039. STORE_BIT(m_pShadowByte, m_mask, GET_BOOL_VAL(m_pDataByte, m_mask));
  1040. Unlock(nLocks);
  1041. }
  1042. public:
  1043. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1044. {
  1045. uint32_t nPubMaskForcedOnce = GetForcedPublishMask();
  1046. ClearForcedPublishMask();
  1047. if(!PublishEnabled())
  1048. return;
  1049. if(!UpdateShadowBuffer(nLocks))
  1050. {
  1051. if(nPubMaskForcedOnce & MQTT_VALUE_BINLE)
  1052. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1053. if(nPubMaskForcedOnce & MQTT_VALUE_BINBE)
  1054. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1055. if(nPubMaskForcedOnce & MQTT_VALUE_JSON)
  1056. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1057. if(nPubMaskForcedOnce & MQTT_VALUE_PBUF)
  1058. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1059. }
  1060. else
  1061. {
  1062. if(PublishEnabled(MQTT_VALUE_BINLE))
  1063. PublishBinLE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1064. if(PublishEnabled(MQTT_VALUE_BINBE))
  1065. PublishBinBE(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1066. if(PublishEnabled(MQTT_VALUE_JSON))
  1067. PublishJson(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1068. if(PublishEnabled(MQTT_VALUE_PBUF))
  1069. PublishPBuf(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1070. }
  1071. }
  1072. virtual bool SetShmValue(uint32_t nValFormat, CMqttMessage *pMsg, int &nLocks) {
  1073. switch(nValFormat)
  1074. {
  1075. case MQTT_VALUE_BINLE:
  1076. case MQTT_VALUE_BINBE:
  1077. return SetShmValueBin(pMsg, nLocks);
  1078. case MQTT_VALUE_JSON:
  1079. return SetShmValueJson(pMsg, nLocks);
  1080. case MQTT_VALUE_PBUF:
  1081. return SetShmValuePBuf(pMsg, nLocks);
  1082. default:
  1083. break;
  1084. }
  1085. return false;
  1086. }
  1087. private:
  1088. bool SetShmValueBin(CMqttMessage *pMsg, int &nLocks)
  1089. {
  1090. if(pMsg->payload && ((size_t)pMsg->payloadlen == sizeof(bool)))
  1091. {
  1092. bool bVal = !!(*(bool*)pMsg->payload);
  1093. Lock(nLocks);
  1094. STORE_BIT(m_pDataByte, m_mask, bVal);
  1095. Unlock(nLocks);
  1096. return true;
  1097. }
  1098. return false;
  1099. }
  1100. bool SetShmValueJson(CMqttMessage *pMsg, int &nLocks)
  1101. {
  1102. bool bRet = false;
  1103. if(pMsg)
  1104. {
  1105. bool bVal;
  1106. CJson_t jtRoot, jtVal;
  1107. std::string sErr;
  1108. if(pMsg->GetPayloadAsJSON(jtRoot, sErr))
  1109. {
  1110. if(jtRoot.GetValue("value", jtVal))
  1111. {
  1112. const json_t *pjt = jtVal;
  1113. if(json_is_boolean(pjt))
  1114. {
  1115. bVal = json_boolean_value(pjt);
  1116. bRet = true;
  1117. }
  1118. else if(json_is_integer(pjt))
  1119. {
  1120. bVal = !!::json_integer_value(pjt);
  1121. bRet = true;
  1122. }
  1123. if(bRet)
  1124. {
  1125. Lock(nLocks);
  1126. STORE_BIT(m_pDataByte, m_mask, bVal);
  1127. Unlock(nLocks);
  1128. }
  1129. }
  1130. }
  1131. }
  1132. return bRet;
  1133. }
  1134. bool SetShmValuePBuf(CMqttMessage *pMsg, int &nLocks)
  1135. {
  1136. TRACE("%s:\n\tProtocol buffers not implemented!\n", GetPath());
  1137. return false;
  1138. }
  1139. void PublishBinLE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1140. {
  1141. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINLE, MQTT_TOPIC_CMD_VALUE);
  1142. bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
  1143. #if _RETURN_BIN_AS_STRING
  1144. std::string v = val ? "true" : "false";
  1145. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1146. #else // _RETURN_BIN_AS_STRING
  1147. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  1148. #endif // _RETURN_BIN_AS_STRING
  1149. rmq.Push(pMsg);
  1150. }
  1151. void PublishBinBE(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1152. {
  1153. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_BINBE, MQTT_TOPIC_CMD_VALUE);
  1154. bool val = GET_BOOL_VAL(m_pShadowByte, m_mask);
  1155. #if _RETURN_BIN_AS_STRING
  1156. std::string v = val ? "true" : "false";
  1157. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1158. #else // _RETURN_BIN_AS_STRING
  1159. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), &val, sizeof(val), GetQoS(), GetRetained());
  1160. #endif // _RETURN_BIN_AS_STRING
  1161. rmq.Push(pMsg);
  1162. }
  1163. void PublishJson(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1164. {
  1165. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_JSON, MQTT_TOPIC_CMD_VALUE);
  1166. CJson_t jtValue(json_object());
  1167. CJson_t jtPath(json_string(GetPath()));
  1168. CJson_t jtIndex(json_integer(GetIndex()));
  1169. CJson_t jtName(json_string(GetName()));
  1170. CJson_t jtVal(GetJsonValue());
  1171. json_object_set(jtValue, "path", jtPath);
  1172. json_object_set(jtValue, "index", jtIndex);
  1173. json_object_set(jtValue, "name", jtName);
  1174. json_object_set(jtValue, "value", jtVal);
  1175. char *pszJson = json_dumps(jtValue, MQTT_JSON_OUTPUT_FLAGS);
  1176. std::string v = pszJson;
  1177. free(pszJson);
  1178. CMqttMessage *pMsg = CMqttMessage::CreateMessage(s.c_str(), v.c_str(), v.length(), GetQoS(), GetRetained());
  1179. rmq.Push(pMsg);
  1180. }
  1181. void PublishPBuf(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1182. {
  1183. std::string s = CreateTopic(pszTopicDevID, pszTopicShmID, MQTT_TOPIC_VALUE_PBUF, MQTT_TOPIC_CMD_VALUE);
  1184. TRACE("%s:\n\tProtocol buffers not implemented!\n", s.c_str());
  1185. }
  1186. json_t* GetJsonValue(void) const
  1187. {
  1188. return json_boolean(GET_BOOL_VAL(m_pShadowByte, m_mask));
  1189. }
  1190. bool UpdateShadowBuffer(int &nLocks)
  1191. {
  1192. bool bVal;
  1193. Lock(nLocks);
  1194. bool bChanged = GET_BOOL_VAL(m_pShadowByte, m_mask) != (bVal = GET_BOOL_VAL(m_pDataByte, m_mask));
  1195. if(bChanged)
  1196. {
  1197. STORE_BIT(m_pShadowByte, m_mask, bVal);
  1198. TRACE("Changed: %s\n", GetPath());
  1199. }
  1200. Unlock(nLocks);
  1201. return bChanged;
  1202. }
  1203. private:
  1204. uint8_t m_mask;
  1205. volatile uint8_t *m_pDataByte;
  1206. uint8_t *m_pShadowByte;
  1207. };
  1208. /////////////////////////////////////////////////////////////////////////////
  1209. template <typename T, typename V>
  1210. class CMqttVarArray : public CMqttVar,
  1211. public std::vector<T>
  1212. {
  1213. public:
  1214. CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
  1215. : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
  1216. {
  1217. for(size_t i = 0; i < nElemCount; i++)
  1218. {
  1219. this->emplace_back(&((V*)pData)[i], &((V*)pShadow)[i], hShm, pszName, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
  1220. }
  1221. }
  1222. CMqttVarArray(void *pData, void *pShadow, HSHM hShm, const char *pszName, size_t nElemCount, size_t nCChData, uint32_t nPubMask, int nQos, bool bRetain, CMqttVar *pParent)
  1223. : CMqttVar(hShm, pszName, -1, 0, nQos, bRetain, pParent)
  1224. {
  1225. for(size_t i = 0; i < nElemCount; i++)
  1226. {
  1227. this->emplace_back(&((V*)pData)[i * nCChData], &((V*)pShadow)[i * nCChData], hShm, pszName, nCChData, i, nPubMask, nQos, bRetain, static_cast<CMqttVar*>(this));
  1228. }
  1229. }
  1230. public:
  1231. virtual void CreateMembersTable(CMqttVarTable &vt)
  1232. {
  1233. CMqttVar::CreateMembersTable(vt);
  1234. for(auto i = this->begin(); i != this->end(); ++i)
  1235. {
  1236. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1237. rVar.CreateMembersTable(vt);
  1238. }
  1239. }
  1240. virtual void InitPath(CMqttVar *pParent, const char *pszMemberName, int nIndex = -1)
  1241. {
  1242. CMqttVar::InitPath(pParent, pszMemberName, nIndex);
  1243. int j = 0;
  1244. for(auto i = this->begin(); i != this->end(); ++i, ++j)
  1245. {
  1246. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1247. rVar.InitPath(pParent, pszMemberName, j);
  1248. }
  1249. }
  1250. virtual void CheckShmAndPublish(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, int &nLocks)
  1251. {
  1252. Lock(nLocks);
  1253. for(auto i = this->begin(); i != this->end(); ++i)
  1254. {
  1255. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1256. rVar.CheckShmAndPublish(pszTopicDevID, pszTopicShmID, rmq, nLocks);
  1257. }
  1258. Unlock(nLocks);
  1259. }
  1260. virtual bool EnablePublish(uint32_t nMask, CMqttVarTable *pvt)
  1261. {
  1262. bool bRet = false;
  1263. for(auto i = this->begin(); i != this->end(); ++i)
  1264. {
  1265. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1266. bRet = rVar.EnablePublish(nMask, pvt) || bRet;
  1267. }
  1268. return bRet;
  1269. }
  1270. virtual bool DisablePublish(uint32_t nMask, CMqttVarTable *pvt)
  1271. {
  1272. bool bRet = false;
  1273. for(auto i = this->begin(); i != this->end(); ++i)
  1274. {
  1275. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1276. bRet = rVar.DisablePublish(nMask, pvt) || bRet;
  1277. }
  1278. return bRet;
  1279. }
  1280. virtual bool SetQoS(int nQos)
  1281. {
  1282. bool bRet = false;
  1283. for(auto i = this->begin(); i != this->end(); ++i)
  1284. {
  1285. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1286. bRet = rVar.SetQoS(nQos) || bRet;
  1287. }
  1288. return bRet;
  1289. }
  1290. virtual bool SetRetained(bool bRetain)
  1291. {
  1292. bool bRet = false;
  1293. for(auto i = this->begin(); i != this->end(); ++i)
  1294. {
  1295. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1296. bRet = rVar.SetRetained(bRetain) || bRet;
  1297. }
  1298. return bRet;
  1299. }
  1300. virtual void RemoveRetained(const char *pszTopicDevID, const char *pszTopicShmID, CMqttMessageQueue &rmq, bool bForce)
  1301. {
  1302. for(auto i = this->begin(); i != this->end(); ++i)
  1303. {
  1304. CMqttVar &rVar = static_cast<CMqttVar&>(*i);
  1305. rVar.RemoveRetained(pszTopicDevID, pszTopicShmID, rmq, bForce);
  1306. }
  1307. }
  1308. };
  1309. /////////////////////////////////////////////////////////////////////////////
  1310. #endif // __cplusplus
  1311. #endif // !defined(AGD_MQTTVAR_H__CA5925ED_7757_407C_86F2_E7687DFFCCFA__INCLUDED_)