main.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272
  1. ////////////////////////////////////////////////////////////////////////////////////////////////////
  2. //
  3. #include <stdio.h>
  4. #include <signal.h>
  5. #include <linux/limits.h>
  6. #include <sys/socket.h>
  7. #include <arpa/inet.h>
  8. #include <linux/if_arp.h>
  9. #include <netinet/in.h>
  10. #include <ifaddrs.h>
  11. #include <sys/ioctl.h>
  12. #include <fcntl.h>
  13. #include <unistd.h>
  14. #include <getopt.h>
  15. #include <gfa/svc/common/strutil.h>
  16. #include <gfa/svc/common/fileutil.h>
  17. #include <gfa/svc/common/instance.h>
  18. #include <gfa/svc/common/processclock.h>
  19. #include <gfa/svc/common/debug.h>
  20. #include <gfa/svc/mqttcl/mqttclient.h>
  21. #include <gfa/svc/mqttcl/mqttcfg.h>
  22. #include <gfa/svc/mqttcl/mqttdbg.h>
  23. #include "projal.h"
  24. ////////////////////////////////////////////////////////////////////////////////////////////////////
  25. #if _ENABLE_MEM_TRACE
  26. #include <mcheck.h>
  27. class CMtrace
  28. {
  29. public:
  30. CMtrace(void) {
  31. putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
  32. mtrace();
  33. }
  34. ~CMtrace(void){
  35. // muntrace();
  36. }
  37. };
  38. #endif // _ENABLE_MEM_TRACE
  39. #if _TRACK_TIMES
  40. static CProcessClock g_pc;
  41. unsigned long g_nDbgCounter1 = 0;
  42. unsigned long g_nDbgCounter2 = 0;
  43. unsigned long g_nDbgCounter3 = 0;
  44. #endif // _TRACK_TIMES
  45. ////////////////////////////////////////////////////////////////////////////////////////////////////
  46. // app control
  47. #define _APPID GFA_APPCTRL_APPID_MQTTCL
  48. #define _APPNAME "MqttCl"
  49. #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
  50. ////////////////////////////////////////////////////////////////////////////////////////////////////
  51. //
  52. #define _UPDATE_INTERVAL_MS 100
  53. #define _RECONN_INTERVAL_MS 1000
  54. #define _LOGFILE_NAME "mqttcl.log"
  55. ////////////////////////////////////////////////////////////////////////////////////////////////////
  56. #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
  57. #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
  58. #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
  59. #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
  60. #define _TOPIC_CTRL_KEY_BINLE "binLe"
  61. #define _TOPIC_CTRL_KEY_BINBE "binBe"
  62. #define _TOPIC_CTRL_KEY_JSON "json"
  63. #define _TOPIC_CTRL_KEY_PBUF "pBuf"
  64. #define _TOPIC_CTRL_KEY_QOS "qos"
  65. #define _TOPIC_CTRL_KEY_RETAIN "retained"
  66. #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
  67. #define _TOPIC_CMD_CTRL "CONTROL"
  68. #define _TOPIC_CMD_SET "SET"
  69. #define _TOPIC_CMD_STATUS "STATUS"
  70. #define _CONNECT_MAX_RETRIES(err) (((err) == MOSQ_ERR_EAI) ? 30 : 3)
  71. ////////////////////////////////////////////////////////////////////////////////////////////////////
  72. //
  73. typedef enum _TOPIC_CTRL_CMD
  74. {
  75. TCC_Control,
  76. TCC_SetBinLe,
  77. TCC_SetBinBe,
  78. TCC_SetJson,
  79. TCC_SetPBuf,
  80. TCC_Status
  81. }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
  82. typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
  83. ////////////////////////////////////////////////////////////////////////////////////////////////////
  84. //
  85. typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
  86. {
  87. CLS_NotInit,
  88. CLS_SetLastWill,
  89. CLS_SetTLS,
  90. CLS_Unconnected,
  91. CLS_Connect,
  92. CLS_Reconnect,
  93. CLS_Connecting,
  94. CLS_Connected,
  95. CLS_Subscribe,
  96. CLS_Subscribing,
  97. CLS_Subscribed,
  98. CLS_PublishConnect,
  99. CLS_ProcMsg,
  100. CLS_PublishDisconnect,
  101. CLS_Err,
  102. CLS_ShutDown,
  103. CLS_Unsubscribe,
  104. CLS_Disconnect,
  105. CLS_Cleanup,
  106. CLS_Paused,
  107. CLS_Exit
  108. }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
  109. typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
  110. static const char *g_pszStateNames[] =
  111. {
  112. "Not Init",
  113. "Set Last Will",
  114. "Set TLS",
  115. "Unconnected",
  116. "Connect",
  117. "Reconnect",
  118. "Connecting",
  119. "Connected",
  120. "Subscribe",
  121. "Subscribing",
  122. "Subscribed",
  123. "Publish Connect",
  124. "ProcMsg",
  125. "Publish Disconnect",
  126. "Error",
  127. "ShutDown",
  128. "Unsubscribe",
  129. "Disconnect",
  130. "Cleanup",
  131. "Paused",
  132. "Exit"
  133. };
  134. static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
  135. {
  136. if(cs >= CLS_NotInit && cs <= CLS_Exit)
  137. return g_pszStateNames[cs];
  138. return "";
  139. }
  140. ////////////////////////////////////////////////////////////////////////////////////////////////////
  141. //
  142. typedef struct _SUB_CTRL_TOPIC
  143. {
  144. _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
  145. {
  146. this->nVarOffset = s.length() - 2;
  147. }
  148. std::string sTopic;
  149. size_t nVarOffset;
  150. }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
  151. typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
  152. ////////////////////////////////////////////////////////////////////////////////////////////////////
  153. //
  154. //static std::string _CreateDeviceID(void);
  155. static volatile bool g_fRun = false;
  156. static volatile bool g_fPauseImp = false;
  157. static volatile bool g_fPauseCmd = false;
  158. static volatile bool g_fZombie = false;
  159. static appid_t g_nDepRunning = 0;
  160. static sigset_t g_set;
  161. static CLogfile g_lf;
  162. static int g_nLastSig = -1;
  163. static shm_t g_shmShadow;
  164. static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
  165. static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
  166. static bool g_bConnected = false;
  167. static int g_nSubcribed = 0;
  168. static bool g_bIntr = false;
  169. static int g_nErrRetries = 0;
  170. ////////////////////////////////////////////////////////////////////////////////////////////////////
  171. //
  172. static const char* _GetBaseDir(std::string &rstrBaseDir)
  173. {
  174. char szBaseDir[PATH_MAX];
  175. rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  176. rtrim(rstrBaseDir, "/");
  177. return rstrBaseDir.c_str();
  178. }
  179. ////////////////////////////////////////////////////////////////////////////////////////////////////
  180. //
  181. static void _SigHandler(int sig)
  182. {
  183. g_nLastSig = sig;
  184. g_bIntr = true;
  185. g_fPauseImp = g_fPauseCmd = g_fZombie = false;
  186. }
  187. ////////////////////////////////////////////////////////////////////////////////////////////////////
  188. //
  189. static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
  190. {
  191. if(usleep(t) < 0 && errno == EINTR)
  192. return CLS_ShutDown;
  193. return csNext;
  194. }
  195. ////////////////////////////////////////////////////////////////////////////////////////////////////
  196. //
  197. static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
  198. {
  199. CMqttVar *pVar;
  200. CMqttMessage *pMsg;
  201. std::string sVarPath;
  202. while((pMsg = rcl.PopRcvMsg()))
  203. {
  204. if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
  205. {
  206. sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
  207. if((pVar = vt.Find(sVarPath.c_str())))
  208. {
  209. bool bChanged = false;
  210. int nType, nQos;
  211. CJson_t jtRoot, jtVal;
  212. std::string err;
  213. uint32_t nMaskOn = 0, nMaskOff = 0;
  214. if(pMsg->GetPayloadAsJSON(jtRoot, err))
  215. {
  216. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
  217. {
  218. switch((nType = jtVal.Type()))
  219. {
  220. case JSON_TRUE:
  221. bChanged = pVar->SetRetained(true) || bChanged;
  222. break;
  223. case JSON_FALSE:
  224. bChanged = pVar->SetRetained(false) || bChanged;
  225. break;
  226. default:
  227. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
  228. break;
  229. }
  230. }
  231. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
  232. {
  233. switch((nType = jtVal.Type()))
  234. {
  235. case JSON_TRUE:
  236. pVar->RemoveRetained(cfg.GetPrefix(), nullptr, rcl.GetMsgQueueSnd(), true);
  237. break;
  238. case JSON_FALSE:
  239. g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
  240. break;
  241. default:
  242. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
  243. break;
  244. }
  245. }
  246. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
  247. {
  248. switch((nType = jtVal.Type()))
  249. {
  250. case JSON_INTEGER:
  251. nQos = (int)json_integer_value(jtVal);
  252. if(nQos < MQTTCL_MIN_QOS)
  253. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
  254. else if(nQos > MQTTCL_MAX_QOS)
  255. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
  256. bChanged = pVar->SetQoS(nQos) || bChanged;
  257. break;
  258. default:
  259. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
  260. break;
  261. }
  262. }
  263. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
  264. {
  265. switch((nType = jtVal.Type()))
  266. {
  267. case JSON_TRUE:
  268. nMaskOn |= MQTT_VALUE_BINLE;
  269. break;
  270. case JSON_FALSE:
  271. nMaskOff |= MQTT_VALUE_BINLE;
  272. break;
  273. default:
  274. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
  275. break;
  276. }
  277. }
  278. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
  279. {
  280. switch((nType = jtVal.Type()))
  281. {
  282. case JSON_TRUE:
  283. nMaskOn |= MQTT_VALUE_BINBE;
  284. break;
  285. case JSON_FALSE:
  286. nMaskOff |= MQTT_VALUE_BINBE;
  287. break;
  288. default:
  289. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
  290. break;
  291. }
  292. }
  293. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
  294. {
  295. switch((nType = jtVal.Type()))
  296. {
  297. case JSON_TRUE:
  298. nMaskOn |= MQTT_VALUE_JSON;
  299. break;
  300. case JSON_FALSE:
  301. nMaskOff |= MQTT_VALUE_JSON;
  302. break;
  303. default:
  304. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
  305. break;
  306. }
  307. }
  308. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
  309. {
  310. switch((nType = jtVal.Type()))
  311. {
  312. case JSON_TRUE:
  313. nMaskOn |= MQTT_VALUE_PBUF;
  314. break;
  315. case JSON_FALSE:
  316. nMaskOff |= MQTT_VALUE_PBUF;
  317. break;
  318. default:
  319. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
  320. break;
  321. }
  322. }
  323. if(nMaskOff)
  324. {
  325. if(pVar->DisablePublish(nMaskOff, &vt))
  326. {
  327. bChanged = true;
  328. }
  329. }
  330. if(nMaskOn)
  331. {
  332. if(pVar->EnablePublish(nMaskOn, &vt))
  333. {
  334. bChanged = true;
  335. }
  336. }
  337. #if _DUMP_ENABLED_VARS
  338. if(bChanged)
  339. vt.DumpPubEnabled();
  340. #endif // _DUMP_ENABLED_VARS
  341. }
  342. else
  343. {
  344. g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
  345. }
  346. }
  347. }
  348. else
  349. {
  350. int nLocks = 0;
  351. static const uint32_t nFormats[] =
  352. {
  353. 0,
  354. MQTT_VALUE_BINLE,
  355. MQTT_VALUE_BINBE,
  356. MQTT_VALUE_JSON,
  357. MQTT_VALUE_PBUF
  358. };
  359. for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
  360. {
  361. if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
  362. {
  363. sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
  364. if((pVar = vt.Find(sVarPath.c_str())))
  365. {
  366. if(pVar->PublishEnabled())
  367. {
  368. if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
  369. {
  370. // !!!
  371. }
  372. }
  373. else
  374. {
  375. // !!!
  376. }
  377. break;
  378. }
  379. else
  380. {
  381. // !!!
  382. break;
  383. }
  384. }
  385. }
  386. }
  387. pMsg->Release();
  388. }
  389. }
  390. ////////////////////////////////////////////////////////////////////////////////////////////////////
  391. //
  392. static int _ProcessOutgoing(CMqttClient &rcl)
  393. {
  394. int nRet = 0;
  395. CMqttMessage *pMsg;
  396. while((pMsg = rcl.PopSndMsg()))
  397. {
  398. rcl.publish(pMsg);
  399. pMsg->Release();
  400. ++nRet;
  401. }
  402. return nRet;
  403. }
  404. ////////////////////////////////////////////////////////////////////////////////////////////////////
  405. //
  406. static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  407. {
  408. int nRet;
  409. for(size_t i = 0; i < nLenMap; i++)
  410. {
  411. if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
  412. break;
  413. TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
  414. }
  415. return nRet;
  416. }
  417. static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  418. {
  419. for(size_t i = 0; i < nLenMap; i++)
  420. {
  421. rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
  422. }
  423. }
  424. ////////////////////////////////////////////////////////////////////////////////////////////////////
  425. //
  426. static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
  427. {
  428. switch(pntf->evt)
  429. {
  430. case NEVT_Log:
  431. if(pntf->log.level == MOSQ_LOG_ERR)
  432. {
  433. TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
  434. g_lf.Error("%s\n", pntf->log.str);
  435. }
  436. break;
  437. case NEVT_Connect:
  438. if(pntf->con.rc == MOSQ_ERR_SUCCESS)
  439. g_bConnected = true;
  440. break;
  441. case NEVT_Disconnect:
  442. break;
  443. case NEVT_Subscribe:
  444. ++g_nSubcribed;
  445. break;
  446. case NEVT_Unsubscribe:
  447. break;
  448. default:
  449. break;
  450. }
  451. }
  452. ////////////////////////////////////////////////////////////////////////////////////////////////////
  453. static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
  454. {
  455. ctrlmsg_t nCtrlMsg;
  456. while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
  457. {
  458. switch(nCtrlMsg)
  459. {
  460. case GFA_APPCTRL_CTRLMSG_STOP:
  461. g_bIntr = true;
  462. g_fPauseImp = false;
  463. g_fPauseCmd = false;
  464. g_fZombie = false;
  465. g_lf.Info("Received Control Message 'Stop'\n");
  466. break;
  467. case GFA_APPCTRL_CTRLMSG_PAUSE:
  468. if(!g_fPauseCmd)
  469. {
  470. g_fPauseCmd = true;
  471. if(!g_fPauseImp)
  472. {
  473. ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
  474. g_lf.Info("Received Control Message 'Pause'\n");
  475. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  476. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  477. }
  478. }
  479. break;
  480. case GFA_APPCTRL_CTRLMSG_RESUME:
  481. if(g_fPauseCmd)
  482. {
  483. g_fPauseCmd = false;
  484. if(!g_fPauseImp)
  485. {
  486. g_lf.Info("Received Control Message 'Resume'\n");
  487. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  488. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  489. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  490. }
  491. }
  492. break;
  493. default:
  494. break;
  495. }
  496. }
  497. }
  498. ////////////////////////////////////////////////////////////////////////////////////////////////////
  499. static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
  500. {
  501. appid_t nAppIdSrc;
  502. bool fOldPaused = g_fPauseImp;
  503. char szDispName[128];
  504. while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
  505. {
  506. GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
  507. GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
  508. TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  509. if(nAppIdSrc & _DEPENDENCIES)
  510. {
  511. if(state == GIAS_Running)
  512. {
  513. g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  514. g_nDepRunning |= nAppIdSrc;
  515. }
  516. else
  517. {
  518. g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  519. g_nDepRunning &= ~nAppIdSrc;
  520. }
  521. }
  522. }
  523. if(!g_bIntr)
  524. {
  525. g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
  526. if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
  527. {
  528. fOldPaused = g_fPauseImp;
  529. GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
  530. ::GfaIpcAppCtrlSetState(hAC, newState);
  531. if(g_fPauseImp)
  532. g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  533. else
  534. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  535. }
  536. }
  537. }
  538. ////////////////////////////////////////////////////////////////////////////////////////////////////
  539. ////////////////////////////////////////////////////////////////////////////////////////////////////
  540. ////////////////////////////////////////////////////////////////////////////////////////////////////
  541. //
  542. int main(int /*argc*/, char **/*argv*/)
  543. {
  544. int nRet = 0;
  545. CProcessInstance pi;
  546. std::string sDevID;
  547. char szLogFile[PATH_MAX];
  548. std::string strBaseDir;
  549. const char *pszBaseDir = NULL;
  550. HAPPCTRL hAC = NULL;
  551. HAPPINFO hAI;
  552. HSHM hShm = NULL;
  553. void *pShm = NULL;
  554. unsigned long long nUsecWorkTime = 0;
  555. int nTlsMode;
  556. CProcessClock pcWork;
  557. ////////////////////////////////////////////////////////////////////////////////////////////////
  558. // check for multiple instances
  559. if(!pi.LockInstance(UUID_SHM))
  560. {
  561. CLogfile::StdErr("Failed to start instance!\n");
  562. return -1;
  563. }
  564. ////////////////////////////////////////////////////////////////////////////////////////////////
  565. // configure signal handling
  566. struct sigaction sa;
  567. ::sigfillset(&g_set);
  568. sigaddset(&g_set, SIGUSR1);
  569. memset(&sa, 0, sizeof(sa));
  570. sa.sa_handler = _SigHandler;
  571. sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
  572. sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
  573. sigaction(SIGTERM, &sa, NULL); // handles normal termination
  574. sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
  575. sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
  576. sa.sa_handler = SIG_IGN;
  577. sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
  578. sigaction(SIGSTOP, &sa, NULL); // ignores Stop
  579. sigaction(SIGCONT, &sa, NULL); // ignores Continue
  580. sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
  581. sigaction(0, &sa, NULL); // ignores shell termination
  582. do
  583. {
  584. g_fZombie = true;
  585. ////////////////////////////////////////////////////////////////////////////////////////////
  586. // get the base directory for output files
  587. if(!pszBaseDir)
  588. pszBaseDir = _GetBaseDir(strBaseDir);
  589. CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
  590. ////////////////////////////////////////////////////////////////////////////////////////////
  591. // initialize log file
  592. sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
  593. if(!g_lf.Open(szLogFile))
  594. {
  595. CLogfile::StdErr("Failed to create/open log file!\n");
  596. nRet = -1;
  597. break;
  598. }
  599. g_lf.Info("Process started.\n");
  600. ////////////////////////////////////////////////////////////////////////////////////////////
  601. // initialize app control
  602. g_lf.Info("Acquire AppCtrl-Handle.\n");
  603. if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
  604. {
  605. g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
  606. break;
  607. }
  608. ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
  609. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
  610. if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
  611. {
  612. g_lf.Error("Failed to subscribe state event notifications!\n");
  613. break;
  614. }
  615. ////////////////////////////////////////////////////////////////////////////////////////////
  616. // parse the config file
  617. CMqttClConfig cfg(UUID_SHM);
  618. #ifdef MQTTCL_CONFIG_FILE_PATH
  619. std::string strMqttCfg = MQTTCL_CONFIG_FILE_PATH;
  620. #else // MQTTCL_CONFIG_FILE_PATH
  621. char szBaseDir[PATH_MAX];
  622. ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  623. std::string strMqttCfg = formatString("%s/cfg/mqtt.conf.json", szBaseDir);
  624. #endif // MQTTCL_CONFIG_FILE_PATH
  625. if(!cfg.LoadCfg(strMqttCfg.c_str(), g_lf))
  626. {
  627. nRet = -1;
  628. break;
  629. }
  630. nTlsMode = cfg.GetTLSMode();
  631. // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
  632. ////////////////////////////////////////////////////////////////////////////////////////////
  633. // client control topic map
  634. const SUB_CTRL_TOPIC subCtrlMap[] =
  635. {
  636. formatString("%s/%s/#", cfg.GetPrefix(), _TOPIC_CMD_CTRL),
  637. formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
  638. formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
  639. formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
  640. formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
  641. // formatString("%s/%s/#", cfg.GetPrefix(), _TOPIC_CMD_STATUS)
  642. };
  643. ////////////////////////////////////////////////////////////////////////////////////////////
  644. if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
  645. {
  646. g_lf.Error("GfaIpcAcquireSHM failed!\n");
  647. break;
  648. }
  649. g_lf.Info("Acquired SHM Handle.\n");
  650. if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
  651. {
  652. g_lf.Error("GfaIpcAcquirePointer failed!\n");
  653. break;
  654. }
  655. g_lf.Info("Acquired SHM Pointer.\n");
  656. ::GfaIpcDumpSHMROT();
  657. memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
  658. ////////////////////////////////////////////////////////////////////////////////////////////
  659. int nErr, nLocked = 0, nNumConn = 0;
  660. bool bReconnect, bConnPending;
  661. std::string strErr;
  662. CMqttVarTable vtbl;
  663. CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
  664. shm.InitPath(NULL, NULL);
  665. shm.CreateMembersTable(vtbl);
  666. #if _DUMP_ENABLED_VARS
  667. vtbl.DumpPubEnabled();
  668. #endif // _DUMP_ENABLED_VARS
  669. ////////////////////////////////////////////////////////////////////////////////////////////
  670. CMqttClient mqttCl(cfg.GetDeviceID());
  671. mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
  672. g_fZombie = false;
  673. g_fRun = true;
  674. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  675. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  676. while(g_fRun)
  677. {
  678. ////////////////////////////////////////////////////////////////////////////////////////
  679. // update app control info
  680. if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
  681. {
  682. _ProcessCtrlMessages(hAC, hAI);
  683. _ProcessStateEvents(hAC, hAI);
  684. }
  685. if(g_fPauseImp || g_fPauseCmd)
  686. {
  687. if(g_cs < CLS_ShutDown)
  688. {
  689. g_csLast = g_cs;
  690. g_cs = CLS_ShutDown;
  691. }
  692. else
  693. {
  694. nUsecWorkTime = 0;
  695. }
  696. }
  697. pcWork.ClockTrigger();
  698. switch(g_cs)
  699. {
  700. case CLS_NotInit:
  701. if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
  702. {
  703. g_cs = CLS_SetTLS;
  704. }
  705. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  706. {
  707. g_csLast = g_cs;
  708. g_cs = CLS_Err;
  709. }
  710. break;
  711. case CLS_SetTLS:
  712. if(nTlsMode == MQTTCL_TLS_MODE_CRT)
  713. {
  714. g_lf.Info("Using TLS with certificates.\n");
  715. if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
  716. {
  717. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  718. if(g_bIntr)
  719. {
  720. g_csLast = g_cs;
  721. g_cs = CLS_ShutDown;
  722. }
  723. else
  724. {
  725. g_csLast = g_cs;
  726. g_cs = CLS_Err;
  727. }
  728. }
  729. else
  730. {
  731. g_cs = CLS_SetLastWill;
  732. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  733. }
  734. }
  735. else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
  736. {
  737. g_lf.Info("Using TLS with PSK.\n");
  738. if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
  739. {
  740. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  741. if(g_bIntr)
  742. {
  743. g_csLast = g_cs;
  744. g_cs = CLS_ShutDown;
  745. }
  746. else
  747. {
  748. g_csLast = g_cs;
  749. g_cs = CLS_Err;
  750. }
  751. }
  752. else
  753. {
  754. g_cs = CLS_SetLastWill;
  755. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  756. }
  757. }
  758. else
  759. {
  760. g_cs = CLS_SetLastWill;
  761. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  762. }
  763. break;
  764. case CLS_SetLastWill:
  765. if(cfg.HasLastWill())
  766. {
  767. std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetLastWillTopic());
  768. if((nErr = mqttCl.will_set(strTopic.c_str(), cfg.GetLastWillMessageLength(), cfg.GetLastWillMessage(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain())) != MOSQ_ERR_SUCCESS)
  769. {
  770. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  771. }
  772. }
  773. g_cs = CLS_Unconnected;
  774. break;
  775. case CLS_Unconnected:
  776. if(g_bConnected)
  777. {
  778. g_bConnected = false;
  779. g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  780. }
  781. if(!nNumConn)
  782. g_cs = CLS_Connect;
  783. else
  784. g_cs = CLS_Reconnect;
  785. break;
  786. case CLS_Connect:
  787. if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort(), cfg.GetKeepAliveTime())) == MOSQ_ERR_SUCCESS)
  788. g_cs = CLS_Connecting;
  789. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  790. {
  791. if(bConnPending)
  792. g_cs = CLS_Connecting;
  793. else if(bReconnect)
  794. {
  795. g_csLast = g_cs;
  796. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  797. }
  798. else if(g_bIntr)
  799. {
  800. g_csLast = g_cs;
  801. g_cs = CLS_ShutDown;
  802. }
  803. else
  804. {
  805. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  806. {
  807. g_csLast = g_cs;
  808. g_cs = CLS_Err;
  809. }
  810. else
  811. {
  812. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  813. }
  814. }
  815. }
  816. break;
  817. case CLS_Reconnect:
  818. if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
  819. g_cs = CLS_Connecting;
  820. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  821. {
  822. if(bConnPending)
  823. g_cs = CLS_Connecting;
  824. else if(bReconnect)
  825. {
  826. g_csLast = g_cs;
  827. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  828. }
  829. else if(g_bIntr)
  830. {
  831. g_csLast = g_cs;
  832. g_cs = CLS_ShutDown;
  833. }
  834. else
  835. {
  836. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  837. {
  838. g_csLast = g_cs;
  839. g_cs = CLS_Err;
  840. }
  841. else
  842. {
  843. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  844. }
  845. }
  846. }
  847. break;
  848. case CLS_Connecting:
  849. g_nErrRetries = 0;
  850. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  851. {
  852. if(bReconnect)
  853. {
  854. g_csLast = g_cs;
  855. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
  856. }
  857. else if(g_bIntr)
  858. {
  859. g_csLast = g_cs;
  860. g_cs = CLS_ShutDown;
  861. }
  862. else if(!bConnPending)
  863. {
  864. g_csLast = g_cs;
  865. g_cs = CLS_Err;
  866. }
  867. }
  868. else if(g_bConnected)
  869. {
  870. g_cs = CLS_Connected;
  871. }
  872. break;
  873. case CLS_Connected:
  874. g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  875. ++nNumConn;
  876. g_nSubcribed = 0;
  877. g_cs = CLS_Subscribe;
  878. break;
  879. case CLS_Subscribe:
  880. g_lf.Info("Subscribing control-topics ...\n");
  881. if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
  882. g_cs = CLS_Subscribing;
  883. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  884. {
  885. if(bConnPending)
  886. g_cs = CLS_Connecting;
  887. else if(bReconnect)
  888. g_cs = CLS_Unconnected;
  889. else if(g_bIntr)
  890. {
  891. g_csLast = g_cs;
  892. g_cs = CLS_ShutDown;
  893. }
  894. else
  895. {
  896. g_csLast = g_cs;
  897. g_cs = CLS_Err;
  898. }
  899. }
  900. break;
  901. case CLS_Subscribing:
  902. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  903. {
  904. if(bReconnect)
  905. g_cs = CLS_Unconnected;
  906. else if(bConnPending)
  907. g_cs = CLS_Connecting;
  908. else if(g_bIntr)
  909. {
  910. g_csLast = g_cs;
  911. g_cs = CLS_ShutDown;
  912. }
  913. else
  914. {
  915. g_csLast = g_cs;
  916. g_cs = CLS_Err;
  917. }
  918. }
  919. else if(g_nSubcribed == _COUNTOF(subCtrlMap))
  920. {
  921. g_cs = CLS_Subscribed;
  922. }
  923. break;
  924. case CLS_Subscribed:
  925. g_lf.Info("Subscriptions acknowledged.\n");
  926. g_lf.Info("Enter SHM processing loop ...\n");
  927. g_cs = CLS_PublishConnect;
  928. break;
  929. case CLS_PublishConnect:
  930. if(cfg.HasConnectMsg())
  931. {
  932. std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetConnectTopic());
  933. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetConnectMessage(), cfg.GetConnectMessageLength(), cfg.GetConnectQOS(), cfg.GetConnectRetain());
  934. mqttCl.publish(pMsg);
  935. pMsg->Release();
  936. }
  937. g_cs = CLS_ProcMsg;
  938. break;
  939. case CLS_ProcMsg:
  940. if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  941. {
  942. #if _TRACK_TIMES
  943. std::string s1, s2;
  944. pc_time64_t elapsed;
  945. g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
  946. g_pc.ClockTrigger();
  947. #endif // _TRACK_TIMES
  948. _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
  949. #if _TRACK_TIMES
  950. if(g_nDbgCounter1)
  951. {
  952. elapsed = g_pc.ClockGetElapsed();
  953. s1 = CProcessClock::Interval2String(elapsed);
  954. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
  955. TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
  956. }
  957. g_pc.ClockTrigger();
  958. #endif // _TRACK_TIMES
  959. _SIG_BLOCK(&g_set);
  960. vtbl.CheckShmAndPublish(cfg.GetPrefix(), nullptr, mqttCl.GetMsgQueueSnd(), nLocked);
  961. _SIG_UNBLOCK(&g_set);
  962. #if _TRACK_TIMES
  963. g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
  964. if(g_nDbgCounter2)
  965. {
  966. elapsed = g_pc.ClockGetElapsed();
  967. s1 = CProcessClock::Interval2String(elapsed);
  968. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
  969. TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
  970. g_pc.ClockTrigger();
  971. }
  972. g_nDbgCounter3 =
  973. #endif // _TRACK_TIMES
  974. _ProcessOutgoing(mqttCl);
  975. #if _TRACK_TIMES
  976. if(g_nDbgCounter3)
  977. {
  978. elapsed = g_pc.ClockGetElapsed();
  979. s1 = CProcessClock::Interval2String(elapsed);
  980. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
  981. TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
  982. }
  983. #endif // _TRACK_TIMES
  984. }
  985. else
  986. {
  987. if(bReconnect)
  988. g_cs = CLS_Unconnected;
  989. else if(bConnPending)
  990. g_cs = CLS_Connecting;
  991. else if(g_bIntr)
  992. {
  993. g_csLast = g_cs;
  994. g_cs = CLS_PublishDisconnect;
  995. }
  996. else
  997. {
  998. g_csLast = g_cs;
  999. g_cs = CLS_Err;
  1000. }
  1001. }
  1002. break;
  1003. case CLS_Err:
  1004. g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  1005. TRACE("Error: %s\n", strErr.c_str());
  1006. g_fZombie = true;
  1007. g_cs = CLS_PublishDisconnect;
  1008. break;
  1009. case CLS_PublishDisconnect:
  1010. if(cfg.HasLastWillOnExit())
  1011. {
  1012. std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetLastWillTopic());
  1013. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetLastWillOnExitMessage(), cfg.GetLastWillOnExitMessageLength(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain());
  1014. mqttCl.publish(pMsg);
  1015. pMsg->Release();
  1016. bool bDummy = false;
  1017. mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bDummy, bDummy, bDummy, strErr);
  1018. }
  1019. g_cs = CLS_Disconnect;
  1020. break;
  1021. case CLS_ShutDown:
  1022. if(g_bIntr && g_nLastSig >= 0)
  1023. {
  1024. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1025. g_nLastSig = -1;
  1026. }
  1027. g_lf.Info("Process shutting down ...\n");
  1028. if(g_csLast >= CLS_Subscribed)
  1029. g_cs = CLS_Unsubscribe;
  1030. else if(g_csLast >= CLS_Connected)
  1031. g_cs = CLS_Disconnect;
  1032. else if(g_csLast > CLS_NotInit)
  1033. g_cs = CLS_Cleanup;
  1034. else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1035. g_cs = CLS_Paused;
  1036. else
  1037. g_cs = CLS_Exit;
  1038. break;
  1039. case CLS_Unsubscribe:
  1040. g_lf.Info("Unsubscribe control-topics.\n");
  1041. _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
  1042. g_nSubcribed = 0;
  1043. g_cs = CLS_Disconnect;
  1044. break;
  1045. case CLS_Disconnect:
  1046. g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  1047. mqttCl.disconnect();
  1048. g_bConnected = false;
  1049. g_cs = CLS_Cleanup;
  1050. break;
  1051. case CLS_Cleanup:
  1052. g_lf.Info("Clean up.\n");
  1053. CMqttClient::Cleanup();
  1054. if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1055. g_cs = CLS_Paused;
  1056. else
  1057. g_cs = CLS_Exit;
  1058. break;
  1059. case CLS_Paused:
  1060. if(!g_fPauseImp && !g_fPauseCmd)
  1061. {
  1062. if(!g_bIntr)
  1063. g_cs = CLS_NotInit;
  1064. else
  1065. g_cs = CLS_Exit;
  1066. }
  1067. else
  1068. {
  1069. usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
  1070. continue;
  1071. }
  1072. break;
  1073. case CLS_Exit:
  1074. g_fRun = false;
  1075. break;
  1076. }
  1077. nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
  1078. }
  1079. }
  1080. while(false);
  1081. ////////////////////////////////////////////////////////////////////////////////////////////////
  1082. if(hShm)
  1083. {
  1084. if(pShm)
  1085. {
  1086. g_lf.Info("Release SHM Pointer.\n");
  1087. ::GfaIpcReleasePointer(hShm, pShm);
  1088. }
  1089. g_lf.Info("Release SHM Handle.\n");
  1090. ::GfaIpcReleaseSHM(hShm);
  1091. }
  1092. if(g_fZombie)
  1093. {
  1094. if(hAC)
  1095. ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
  1096. TRACE("Enter Zombie state ...\n");
  1097. g_lf.Warning("Enter Zombie state ...\n");
  1098. g_lf.Flush();
  1099. pause();
  1100. if(g_nLastSig >= 0)
  1101. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1102. }
  1103. if(hAC)
  1104. {
  1105. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
  1106. ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
  1107. g_lf.Info("Releasing App Control ...\n");
  1108. ::GfaIpcAppCtrlRelease(hAC);
  1109. }
  1110. g_lf.Info("Process exit.\n\n");
  1111. g_lf.Close();
  1112. CLogfile::StdErr("MqttCl exit.\n");
  1113. return nRet;
  1114. }