main.cpp 30 KB


  1. ////////////////////////////////////////////////////////////////////////////////////////////////////
  2. //
  3. #include <stdio.h>
  4. #include <signal.h>
  5. #include <linux/limits.h>
  6. #include <sys/socket.h>
  7. #include <arpa/inet.h>
  8. #include <linux/if_arp.h>
  9. #include <netinet/in.h>
  10. #include <ifaddrs.h>
  11. #include <sys/ioctl.h>
  12. #include <fcntl.h>
  13. #include <unistd.h>
  14. #include <getopt.h>
  15. #include "strutil.h"
  16. #include "fileutil.h"
  17. #include "instance.h"
  18. #include "processclock.h"
  19. #include "debug.h"
  20. #include "projal.h"
  21. #include "mqttclient.h"
  22. #include "mqttcfg.h"
  23. #include "mqttdbg.h"
  24. ////////////////////////////////////////////////////////////////////////////////////////////////////
  25. #if _ENABLE_MEM_TRACE
  26. #include <mcheck.h>
  27. class CMtrace
  28. {
  29. public:
  30. CMtrace(void) {
  31. putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
  32. mtrace();
  33. }
  34. ~CMtrace(void){
  35. // muntrace();
  36. }
  37. };
  38. #endif // _ENABLE_MEM_TRACE
  39. #if _TRACK_TIMES
  40. static CProcessClock g_pc;
  41. unsigned long g_nDbgCounter1 = 0;
  42. unsigned long g_nDbgCounter2 = 0;
  43. unsigned long g_nDbgCounter3 = 0;
  44. #endif // _TRACK_TIMES
  45. ////////////////////////////////////////////////////////////////////////////////////////////////////
  46. // app control
  47. #define _APPID GFA_APPCTRL_APPID_MQTTCL
  48. #define _APPNAME "MqttCl"
  49. #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
  50. ////////////////////////////////////////////////////////////////////////////////////////////////////
  51. //
  52. #define _UPDATE_INTERVAL_MS 100
  53. #define _RECONN_INTERVAL_MS 1000
  54. #define _LOGFILE_NAME "mqttcl.log"
  55. ////////////////////////////////////////////////////////////////////////////////////////////////////
  56. #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
  57. #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
  58. #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
  59. #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
  60. #define _TOPIC_CTRL_KEY_BINLE "binLe"
  61. #define _TOPIC_CTRL_KEY_BINBE "binBe"
  62. #define _TOPIC_CTRL_KEY_JSON "json"
  63. #define _TOPIC_CTRL_KEY_PBUF "pBuf"
  64. #define _TOPIC_CTRL_KEY_QOS "qos"
  65. #define _TOPIC_CTRL_KEY_RETAIN "retained"
  66. #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
  67. #define _TOPIC_CMD_CTRL "CONTROL"
  68. #define _TOPIC_CMD_SET "SET"
  69. #define _TOPIC_CMD_STATUS "STATUS"
  70. typedef enum _TOPIC_CTRL_CMD
  71. {
  72. TCC_Control,
  73. TCC_SetBinLe,
  74. TCC_SetBinBe,
  75. TCC_SetJson,
  76. TCC_SetPBuf,
  77. TCC_Status
  78. }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
  79. typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
  80. ////////////////////////////////////////////////////////////////////////////////////////////////////
  81. //
  82. typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
  83. {
  84. CLS_NotInit,
  85. CLS_SetTLS,
  86. CLS_Unconnected,
  87. CLS_Connect,
  88. CLS_Reconnect,
  89. CLS_Connecting,
  90. CLS_Connected,
  91. CLS_Subscribe,
  92. CLS_Subscribing,
  93. CLS_Subscribed,
  94. CLS_ProcMsg,
  95. CLS_ShutDown,
  96. CLS_Err,
  97. CLS_Unsubscribe,
  98. CLS_Disconnect,
  99. CLS_Cleanup,
  100. CLS_Paused,
  101. CLS_Exit
  102. }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
  103. typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
  104. static const char *g_pszStateNames[] =
  105. {
  106. "Not Init",
  107. "Set TLS",
  108. "Unconnected",
  109. "Connect",
  110. "Reconnect",
  111. "Connecting",
  112. "Connected",
  113. "Subscribe",
  114. "Subscribing",
  115. "Subscribed",
  116. "ProcMsg",
  117. "ShutDown",
  118. "Error",
  119. "Unsubscribe",
  120. "Disconnect",
  121. "Cleanup",
  122. "Paused",
  123. "Exit"
  124. };
  125. static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
  126. {
  127. if(cs >= CLS_NotInit && cs <= CLS_Exit)
  128. return g_pszStateNames[cs];
  129. return "";
  130. }
  131. ////////////////////////////////////////////////////////////////////////////////////////////////////
  132. //
  133. typedef struct _SUB_CTRL_TOPIC
  134. {
  135. _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
  136. {
  137. this->nVarOffset = s.length() - 2;
  138. }
  139. std::string sTopic;
  140. size_t nVarOffset;
  141. }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
  142. typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
  143. ////////////////////////////////////////////////////////////////////////////////////////////////////
  144. //
  145. //static std::string _CreateDeviceID(void);
  146. static volatile bool g_fRun = false;
  147. static volatile bool g_fPauseImp = false;
  148. static volatile bool g_fPauseCmd = false;
  149. static volatile bool g_fZombie = false;
  150. static appid_t g_nDepRunning = 0;
  151. static sigset_t g_set;
  152. static CLogfile g_lf;
  153. static int g_nLastSig = -1;
  154. static shm_t g_shmShadow;
  155. static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
  156. static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
  157. static bool g_bConnected = false;
  158. static int g_nSubcribed = 0;
  159. static bool g_bIntr = false;
  160. ////////////////////////////////////////////////////////////////////////////////////////////////////
  161. //
  162. static const char* _GetBaseDir(std::string &rstrBaseDir)
  163. {
  164. char szBaseDir[PATH_MAX];
  165. rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  166. rtrim(rstrBaseDir, "/");
  167. return rstrBaseDir.c_str();
  168. }
  169. ////////////////////////////////////////////////////////////////////////////////////////////////////
  170. //
  171. static void _SigHandler(int sig)
  172. {
  173. g_nLastSig = sig;
  174. g_bIntr = true;
  175. g_fPauseImp = g_fPauseCmd = g_fZombie = false;
  176. }
  177. ////////////////////////////////////////////////////////////////////////////////////////////////////
  178. //
  179. static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
  180. {
  181. if(usleep(t) < 0 && errno == EINTR)
  182. return CLS_ShutDown;
  183. return csNext;
  184. }
  185. ////////////////////////////////////////////////////////////////////////////////////////////////////
  186. //
  187. static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
  188. {
  189. CMqttVar *pVar;
  190. CMqttMessage *pMsg;
  191. std::string sVarPath;
  192. while((pMsg = rcl.PopRcvMsg()))
  193. {
  194. if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
  195. {
  196. sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
  197. if((pVar = vt.Find(sVarPath.c_str())))
  198. {
  199. bool bChanged = false;
  200. int nType, nQos;
  201. CJson_t jtRoot, jtVal;
  202. std::string err;
  203. uint32_t nMaskOn = 0, nMaskOff = 0;
  204. if(pMsg->GetPayloadAsJSON(jtRoot, err))
  205. {
  206. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
  207. {
  208. switch((nType = jtVal.Type()))
  209. {
  210. case JSON_TRUE:
  211. bChanged = pVar->SetRetained(true) || bChanged;
  212. break;
  213. case JSON_FALSE:
  214. bChanged = pVar->SetRetained(false) || bChanged;
  215. break;
  216. default:
  217. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
  218. break;
  219. }
  220. }
  221. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
  222. {
  223. switch((nType = jtVal.Type()))
  224. {
  225. case JSON_TRUE:
  226. pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true);
  227. break;
  228. case JSON_FALSE:
  229. g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
  230. break;
  231. default:
  232. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
  233. break;
  234. }
  235. }
  236. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
  237. {
  238. switch((nType = jtVal.Type()))
  239. {
  240. case JSON_INTEGER:
  241. nQos = (int)json_integer_value(jtVal);
  242. if(nQos < MQTTCL_MIN_QOS)
  243. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
  244. else if(nQos > MQTTCL_MAX_QOS)
  245. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
  246. bChanged = pVar->SetQoS(nQos) || bChanged;
  247. break;
  248. default:
  249. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
  250. break;
  251. }
  252. }
  253. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
  254. {
  255. switch((nType = jtVal.Type()))
  256. {
  257. case JSON_TRUE:
  258. nMaskOn |= MQTT_VALUE_BINLE;
  259. break;
  260. case JSON_FALSE:
  261. nMaskOff |= MQTT_VALUE_BINLE;
  262. break;
  263. default:
  264. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
  265. break;
  266. }
  267. }
  268. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
  269. {
  270. switch((nType = jtVal.Type()))
  271. {
  272. case JSON_TRUE:
  273. nMaskOn |= MQTT_VALUE_BINBE;
  274. break;
  275. case JSON_FALSE:
  276. nMaskOff |= MQTT_VALUE_BINBE;
  277. break;
  278. default:
  279. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
  280. break;
  281. }
  282. }
  283. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
  284. {
  285. switch((nType = jtVal.Type()))
  286. {
  287. case JSON_TRUE:
  288. nMaskOn |= MQTT_VALUE_JSON;
  289. break;
  290. case JSON_FALSE:
  291. nMaskOff |= MQTT_VALUE_JSON;
  292. break;
  293. default:
  294. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
  295. break;
  296. }
  297. }
  298. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
  299. {
  300. switch((nType = jtVal.Type()))
  301. {
  302. case JSON_TRUE:
  303. nMaskOn |= MQTT_VALUE_PBUF;
  304. break;
  305. case JSON_FALSE:
  306. nMaskOff |= MQTT_VALUE_PBUF;
  307. break;
  308. default:
  309. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
  310. break;
  311. }
  312. }
  313. if(nMaskOff)
  314. {
  315. if(pVar->DisablePublish(nMaskOff, &vt))
  316. {
  317. bChanged = true;
  318. }
  319. }
  320. if(nMaskOn)
  321. {
  322. if(pVar->EnablePublish(nMaskOn, &vt))
  323. {
  324. bChanged = true;
  325. }
  326. }
  327. #if _DUMP_ENABLED_VARS
  328. if(bChanged)
  329. vt.DumpPubEnabled();
  330. #endif // _DUMP_ENABLED_VARS
  331. }
  332. else
  333. {
  334. g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
  335. }
  336. }
  337. }
  338. else
  339. {
  340. int nLocks = 0;
  341. static const uint32_t nFormats[] =
  342. {
  343. 0,
  344. MQTT_VALUE_BINLE,
  345. MQTT_VALUE_BINBE,
  346. MQTT_VALUE_JSON,
  347. MQTT_VALUE_PBUF
  348. };
  349. for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
  350. {
  351. if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
  352. {
  353. sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
  354. if((pVar = vt.Find(sVarPath.c_str())))
  355. {
  356. if(pVar->PublishEnabled())
  357. {
  358. if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
  359. {
  360. // !!!
  361. }
  362. }
  363. else
  364. {
  365. // !!!
  366. }
  367. break;
  368. }
  369. else
  370. {
  371. // !!!
  372. break;
  373. }
  374. }
  375. }
  376. }
  377. pMsg->Release();
  378. }
  379. }
  380. ////////////////////////////////////////////////////////////////////////////////////////////////////
  381. //
  382. static int _ProcessOutgoing(CMqttClient &rcl)
  383. {
  384. int nRet = 0;
  385. CMqttMessage *pMsg;
  386. while((pMsg = rcl.PopSndMsg()))
  387. {
  388. rcl.publish(pMsg);
  389. pMsg->Release();
  390. ++nRet;
  391. }
  392. return nRet;
  393. }
  394. ////////////////////////////////////////////////////////////////////////////////////////////////////
  395. //
  396. static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  397. {
  398. int nRet;
  399. for(size_t i = 0; i < nLenMap; i++)
  400. {
  401. if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
  402. break;
  403. TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
  404. }
  405. return nRet;
  406. }
  407. static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  408. {
  409. for(size_t i = 0; i < nLenMap; i++)
  410. {
  411. rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
  412. }
  413. }
  414. ////////////////////////////////////////////////////////////////////////////////////////////////////
  415. //
  416. static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
  417. {
  418. switch(pntf->evt)
  419. {
  420. case NEVT_Log:
  421. if(pntf->log.level == MOSQ_LOG_ERR)
  422. {
  423. TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
  424. g_lf.Error("%s\n", pntf->log.str);
  425. }
  426. break;
  427. case NEVT_Connect:
  428. if(pntf->con.rc == MOSQ_ERR_SUCCESS)
  429. g_bConnected = true;
  430. break;
  431. case NEVT_Disconnect:
  432. break;
  433. case NEVT_Subscribe:
  434. ++g_nSubcribed;
  435. break;
  436. case NEVT_Unsubscribe:
  437. break;
  438. default:
  439. break;
  440. }
  441. }
  442. ////////////////////////////////////////////////////////////////////////////////////////////////////
  443. static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
  444. {
  445. ctrlmsg_t nCtrlMsg;
  446. while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
  447. {
  448. switch(nCtrlMsg)
  449. {
  450. case GFA_APPCTRL_CTRLMSG_STOP:
  451. g_bIntr = true;
  452. g_fPauseImp = false;
  453. g_fPauseCmd = false;
  454. g_fZombie = false;
  455. g_lf.Info("Received Control Message 'Stop'\n");
  456. break;
  457. case GFA_APPCTRL_CTRLMSG_PAUSE:
  458. if(!g_fPauseCmd)
  459. {
  460. g_fPauseCmd = true;
  461. if(!g_fPauseImp)
  462. {
  463. ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
  464. g_lf.Info("Received Control Message 'Pause'\n");
  465. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  466. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  467. }
  468. }
  469. break;
  470. case GFA_APPCTRL_CTRLMSG_RESUME:
  471. if(g_fPauseCmd)
  472. {
  473. g_fPauseCmd = false;
  474. if(!g_fPauseImp)
  475. {
  476. g_lf.Info("Received Control Message 'Resume'\n");
  477. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  478. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  479. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  480. }
  481. }
  482. break;
  483. default:
  484. break;
  485. }
  486. }
  487. }
  488. ////////////////////////////////////////////////////////////////////////////////////////////////////
  489. static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
  490. {
  491. appid_t nAppIdSrc;
  492. bool fOldPaused = g_fPauseImp;
  493. char szDispName[128];
  494. while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
  495. {
  496. GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
  497. GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
  498. TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  499. if(nAppIdSrc & _DEPENDENCIES)
  500. {
  501. if(state == GIAS_Running)
  502. {
  503. g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  504. g_nDepRunning |= nAppIdSrc;
  505. }
  506. else
  507. {
  508. g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  509. g_nDepRunning &= ~nAppIdSrc;
  510. }
  511. }
  512. }
  513. if(!g_bIntr)
  514. {
  515. g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
  516. if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
  517. {
  518. fOldPaused = g_fPauseImp;
  519. GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
  520. ::GfaIpcAppCtrlSetState(hAC, newState);
  521. if(g_fPauseImp)
  522. g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  523. else
  524. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  525. }
  526. }
  527. }
  528. ////////////////////////////////////////////////////////////////////////////////////////////////////
  529. ////////////////////////////////////////////////////////////////////////////////////////////////////
  530. ////////////////////////////////////////////////////////////////////////////////////////////////////
  531. //
  532. int main(int /*argc*/, char **/*argv*/)
  533. {
  534. int nRet = 0;
  535. CProcessInstance pi;
  536. std::string sDevID;
  537. char szLogFile[PATH_MAX];
  538. std::string strBaseDir;
  539. const char *pszBaseDir = NULL;
  540. HAPPCTRL hAC = NULL;
  541. HAPPINFO hAI;
  542. HSHM hShm = NULL;
  543. void *pShm = NULL;
  544. unsigned long long nUsecWorkTime = 0;
  545. int nTlsMode;
  546. CProcessClock pcWork;
  547. ////////////////////////////////////////////////////////////////////////////////////////////////
  548. // check for multiple instances
  549. if(!pi.LockInstance(UUID_SHM))
  550. {
  551. CLogfile::StdErr("Failed to start instance!\n");
  552. return -1;
  553. }
  554. ////////////////////////////////////////////////////////////////////////////////////////////////
  555. // configure signal handling
  556. struct sigaction sa;
  557. ::sigfillset(&g_set);
  558. sigaddset(&g_set, SIGUSR1);
  559. memset(&sa, 0, sizeof(sa));
  560. sa.sa_handler = _SigHandler;
  561. sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
  562. sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
  563. sigaction(SIGTERM, &sa, NULL); // handles normal termination
  564. sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
  565. sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
  566. sa.sa_handler = SIG_IGN;
  567. sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
  568. sigaction(SIGSTOP, &sa, NULL); // ignores Stop
  569. sigaction(SIGCONT, &sa, NULL); // ignores Continue
  570. sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
  571. sigaction(0, &sa, NULL); // ignores shell termination
  572. do
  573. {
  574. g_fZombie = true;
  575. ////////////////////////////////////////////////////////////////////////////////////////////
  576. // get the base directory for output files
  577. if(!pszBaseDir)
  578. pszBaseDir = _GetBaseDir(strBaseDir);
  579. CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
  580. ////////////////////////////////////////////////////////////////////////////////////////////
  581. // initialize log file
  582. sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
  583. if(!g_lf.Open(szLogFile))
  584. {
  585. CLogfile::StdErr("Failed to create/open log file!\n");
  586. nRet = -1;
  587. break;
  588. }
  589. g_lf.Info("Process started.\n");
  590. ////////////////////////////////////////////////////////////////////////////////////////////
  591. // initialize app control
  592. g_lf.Info("Acquire AppCtrl-Handle.\n");
  593. if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
  594. {
  595. g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
  596. break;
  597. }
  598. ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
  599. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
  600. if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
  601. {
  602. g_lf.Error("Failed to subscribe state event notifications!\n");
  603. break;
  604. }
  605. ////////////////////////////////////////////////////////////////////////////////////////////
  606. // parse the config file
  607. CMqttClConfig cfg(UUID_SHM);
  608. if(!cfg.LoadCfg(MQTTCL_CONFIG_FILE_PATH, g_lf))
  609. {
  610. nRet = -1;
  611. break;
  612. }
  613. nTlsMode = cfg.GetTLSMode();
  614. // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
  615. ////////////////////////////////////////////////////////////////////////////////////////////
  616. // client control topic map
  617. const SUB_CTRL_TOPIC subCtrlMap[] =
  618. {
  619. formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL),
  620. formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
  621. formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
  622. formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
  623. formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
  624. // formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS)
  625. };
  626. ////////////////////////////////////////////////////////////////////////////////////////////
  627. if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
  628. {
  629. g_lf.Error("GfaIpcAcquireSHM failed!\n");
  630. break;
  631. }
  632. g_lf.Info("Acquired SHM Handle.\n");
  633. if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
  634. {
  635. g_lf.Error("GfaIpcAcquirePointer failed!\n");
  636. break;
  637. }
  638. g_lf.Info("Acquired SHM Pointer.\n");
  639. ::GfaIpcDumpSHMROT();
  640. memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
  641. ////////////////////////////////////////////////////////////////////////////////////////////
  642. int nErr, nLocked = 0, nNumConn = 0;
  643. bool bReconnect, bConnPending;
  644. std::string strErr;
  645. CMqttVarTable vtbl;
  646. CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
  647. shm.InitPath(NULL, NULL);
  648. shm.CreateMembersTable(vtbl);
  649. #if _DUMP_ENABLED_VARS
  650. vtbl.DumpPubEnabled();
  651. #endif // _DUMP_ENABLED_VARS
  652. ////////////////////////////////////////////////////////////////////////////////////////////
  653. CMqttClient mqttCl(cfg.GetDeviceID());
  654. mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
  655. g_fZombie = false;
  656. g_fRun = true;
  657. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  658. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  659. while(g_fRun)
  660. {
  661. ////////////////////////////////////////////////////////////////////////////////////////
  662. // update app control info
  663. if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
  664. {
  665. _ProcessCtrlMessages(hAC, hAI);
  666. _ProcessStateEvents(hAC, hAI);
  667. }
  668. if(g_fPauseImp || g_fPauseCmd)
  669. {
  670. if(g_cs < CLS_ShutDown)
  671. {
  672. g_csLast = g_cs;
  673. g_cs = CLS_ShutDown;
  674. }
  675. else
  676. {
  677. nUsecWorkTime = 0;
  678. }
  679. }
  680. pcWork.ClockTrigger();
  681. switch(g_cs)
  682. {
  683. case CLS_NotInit:
  684. if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
  685. {
  686. g_cs = CLS_SetTLS;
  687. }
  688. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  689. {
  690. g_csLast = g_cs;
  691. g_cs = CLS_Err;
  692. }
  693. break;
  694. case CLS_SetTLS:
  695. if(nTlsMode == MQTTCL_TLS_MODE_CRT)
  696. {
  697. g_lf.Info("Using TLS with certificates.\n");
  698. if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
  699. {
  700. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  701. if(g_bIntr)
  702. {
  703. g_csLast = g_cs;
  704. g_cs = CLS_ShutDown;
  705. }
  706. else
  707. {
  708. g_csLast = g_cs;
  709. g_cs = CLS_Err;
  710. }
  711. }
  712. else
  713. {
  714. g_cs = CLS_Unconnected;
  715. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  716. }
  717. }
  718. else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
  719. {
  720. g_lf.Info("Using TLS with PSK.\n");
  721. if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
  722. {
  723. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  724. if(g_bIntr)
  725. {
  726. g_csLast = g_cs;
  727. g_cs = CLS_ShutDown;
  728. }
  729. else
  730. {
  731. g_csLast = g_cs;
  732. g_cs = CLS_Err;
  733. }
  734. }
  735. else
  736. {
  737. g_cs = CLS_Unconnected;
  738. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  739. }
  740. }
  741. else
  742. {
  743. g_cs = CLS_Unconnected;
  744. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  745. }
  746. break;
  747. case CLS_Unconnected:
  748. if(g_bConnected)
  749. {
  750. g_bConnected = false;
  751. g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  752. }
  753. if(!nNumConn)
  754. g_cs = CLS_Connect;
  755. else
  756. g_cs = CLS_Reconnect;
  757. break;
  758. case CLS_Connect:
  759. if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS)
  760. g_cs = CLS_Connecting;
  761. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  762. {
  763. if(bConnPending)
  764. g_cs = CLS_Connecting;
  765. else if(bReconnect)
  766. {
  767. g_csLast = g_cs;
  768. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  769. }
  770. else if(g_bIntr)
  771. {
  772. g_csLast = g_cs;
  773. g_cs = CLS_ShutDown;
  774. }
  775. else
  776. {
  777. g_csLast = g_cs;
  778. g_cs = CLS_Err;
  779. }
  780. }
  781. break;
  782. case CLS_Reconnect:
  783. if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
  784. g_cs = CLS_Connecting;
  785. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  786. {
  787. if(bConnPending)
  788. g_cs = CLS_Connecting;
  789. else if(bReconnect)
  790. {
  791. g_csLast = g_cs;
  792. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  793. }
  794. else if(g_bIntr)
  795. {
  796. g_csLast = g_cs;
  797. g_cs = CLS_ShutDown;
  798. }
  799. else
  800. {
  801. g_csLast = g_cs;
  802. g_cs = CLS_Err;
  803. }
  804. }
  805. break;
  806. case CLS_Connecting:
  807. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  808. {
  809. if(bReconnect)
  810. {
  811. g_csLast = g_cs;
  812. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
  813. }
  814. else if(g_bIntr)
  815. {
  816. g_csLast = g_cs;
  817. g_cs = CLS_ShutDown;
  818. }
  819. else if(!bConnPending)
  820. {
  821. g_csLast = g_cs;
  822. g_cs = CLS_Err;
  823. }
  824. }
  825. else if(g_bConnected)
  826. {
  827. g_cs = CLS_Connected;
  828. }
  829. break;
  830. case CLS_Connected:
  831. g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  832. ++nNumConn;
  833. g_nSubcribed = 0;
  834. g_cs = CLS_Subscribe;
  835. break;
  836. case CLS_Subscribe:
  837. g_lf.Info("Subscribing control-topics ...\n");
  838. if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
  839. g_cs = CLS_Subscribing;
  840. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  841. {
  842. if(bConnPending)
  843. g_cs = CLS_Connecting;
  844. else if(bReconnect)
  845. g_cs = CLS_Unconnected;
  846. else if(g_bIntr)
  847. {
  848. g_csLast = g_cs;
  849. g_cs = CLS_ShutDown;
  850. }
  851. else
  852. {
  853. g_csLast = g_cs;
  854. g_cs = CLS_Err;
  855. }
  856. }
  857. break;
  858. case CLS_Subscribing:
  859. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  860. {
  861. if(bReconnect)
  862. g_cs = CLS_Unconnected;
  863. else if(bConnPending)
  864. g_cs = CLS_Connecting;
  865. else if(g_bIntr)
  866. {
  867. g_csLast = g_cs;
  868. g_cs = CLS_ShutDown;
  869. }
  870. else
  871. {
  872. g_csLast = g_cs;
  873. g_cs = CLS_Err;
  874. }
  875. }
  876. else if(g_nSubcribed == _COUNTOF(subCtrlMap))
  877. {
  878. g_cs = CLS_Subscribed;
  879. }
  880. break;
  881. case CLS_Subscribed:
  882. g_lf.Info("Subscriptions acknowledged.\n");
  883. g_lf.Info("Enter SHM processing loop ...\n");
  884. g_cs = CLS_ProcMsg;
  885. break;
  886. case CLS_ProcMsg:
  887. if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  888. {
  889. #if _TRACK_TIMES
  890. std::string s1, s2;
  891. pc_time64_t elapsed;
  892. g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
  893. g_pc.ClockTrigger();
  894. #endif // _TRACK_TIMES
  895. _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
  896. #if _TRACK_TIMES
  897. if(g_nDbgCounter1)
  898. {
  899. elapsed = g_pc.ClockGetElapsed();
  900. s1 = CProcessClock::Interval2String(elapsed);
  901. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
  902. TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
  903. }
  904. g_pc.ClockTrigger();
  905. #endif // _TRACK_TIMES
  906. _SIG_BLOCK(&g_set);
  907. vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked);
  908. _SIG_UNBLOCK(&g_set);
  909. #if _TRACK_TIMES
  910. g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
  911. if(g_nDbgCounter2)
  912. {
  913. elapsed = g_pc.ClockGetElapsed();
  914. s1 = CProcessClock::Interval2String(elapsed);
  915. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
  916. TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
  917. g_pc.ClockTrigger();
  918. }
  919. g_nDbgCounter3 =
  920. #endif // _TRACK_TIMES
  921. _ProcessOutgoing(mqttCl);
  922. #if _TRACK_TIMES
  923. if(g_nDbgCounter3)
  924. {
  925. elapsed = g_pc.ClockGetElapsed();
  926. s1 = CProcessClock::Interval2String(elapsed);
  927. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
  928. TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
  929. }
  930. #endif // _TRACK_TIMES
  931. }
  932. else
  933. {
  934. if(bReconnect)
  935. g_cs = CLS_Unconnected;
  936. else if(bConnPending)
  937. g_cs = CLS_Connecting;
  938. else if(g_bIntr)
  939. {
  940. g_csLast = g_cs;
  941. g_cs = CLS_ShutDown;
  942. }
  943. else
  944. {
  945. g_csLast = g_cs;
  946. g_cs = CLS_Err;
  947. }
  948. }
  949. break;
  950. case CLS_Err:
  951. g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  952. TRACE("Error: %s\n", strErr.c_str());
  953. g_fZombie = true;
  954. g_cs = CLS_ShutDown;
  955. break;
  956. case CLS_ShutDown:
  957. if(g_bIntr && g_nLastSig >= 0)
  958. {
  959. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  960. g_nLastSig = -1;
  961. }
  962. g_lf.Info("Process shutting down ...\n");
  963. if(g_csLast >= CLS_Subscribed)
  964. g_cs = CLS_Unsubscribe;
  965. else if(g_csLast >= CLS_Connected)
  966. g_cs = CLS_Disconnect;
  967. else if(g_csLast > CLS_NotInit)
  968. g_cs = CLS_Cleanup;
  969. else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  970. g_cs = CLS_Paused;
  971. else
  972. g_cs = CLS_Exit;
  973. break;
  974. case CLS_Unsubscribe:
  975. g_lf.Info("Unsubscribe control-topics.\n");
  976. _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
  977. g_nSubcribed = 0;
  978. g_cs = CLS_Disconnect;
  979. break;
  980. case CLS_Disconnect:
  981. g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  982. mqttCl.disconnect();
  983. g_bConnected = false;
  984. g_cs = CLS_Cleanup;
  985. break;
  986. case CLS_Cleanup:
  987. g_lf.Info("Clean up.\n");
  988. CMqttClient::Cleanup();
  989. if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  990. g_cs = CLS_Paused;
  991. else
  992. g_cs = CLS_Exit;
  993. break;
  994. case CLS_Paused:
  995. if(!g_fPauseImp && !g_fPauseCmd)
  996. {
  997. if(!g_bIntr)
  998. g_cs = CLS_NotInit;
  999. else
  1000. g_cs = CLS_Exit;
  1001. }
  1002. else
  1003. {
  1004. usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
  1005. continue;
  1006. }
  1007. break;
  1008. case CLS_Exit:
  1009. g_fRun = false;
  1010. break;
  1011. }
  1012. nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
  1013. }
  1014. }
  1015. while(false);
  1016. ////////////////////////////////////////////////////////////////////////////////////////////////
  1017. if(hShm)
  1018. {
  1019. if(pShm)
  1020. {
  1021. g_lf.Info("Release SHM Pointer.\n");
  1022. ::GfaIpcReleasePointer(hShm, pShm);
  1023. }
  1024. g_lf.Info("Release SHM Handle.\n");
  1025. ::GfaIpcReleaseSHM(hShm);
  1026. }
  1027. if(g_fZombie)
  1028. {
  1029. if(hAC)
  1030. ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
  1031. TRACE("Enter Zombie state ...\n");
  1032. g_lf.Warning("Enter Zombie state ...\n");
  1033. g_lf.Flush();
  1034. pause();
  1035. if(g_nLastSig >= 0)
  1036. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1037. }
  1038. if(hAC)
  1039. {
  1040. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
  1041. ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
  1042. g_lf.Info("Releasing App Control ...\n");
  1043. ::GfaIpcAppCtrlRelease(hAC);
  1044. }
  1045. g_lf.Info("Process exit.\n\n");
  1046. g_lf.Close();
  1047. CLogfile::StdErr("MqttCl exit.\n");
  1048. return nRet;
  1049. }