|
@@ -1,1202 +0,0 @@
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-#include <stdio.h>
|
|
|
|
-#include <signal.h>
|
|
|
|
-#include <linux/limits.h>
|
|
|
|
-#include <sys/socket.h>
|
|
|
|
-#include <arpa/inet.h>
|
|
|
|
-#include <linux/if_arp.h>
|
|
|
|
-#include <netinet/in.h>
|
|
|
|
-#include <ifaddrs.h>
|
|
|
|
-#include <sys/ioctl.h>
|
|
|
|
-#include <fcntl.h>
|
|
|
|
-#include <unistd.h>
|
|
|
|
-#include <getopt.h>
|
|
|
|
-#include "strutil.h"
|
|
|
|
-#include "fileutil.h"
|
|
|
|
-#include "instance.h"
|
|
|
|
-#include "processclock.h"
|
|
|
|
-#include "debug.h"
|
|
|
|
-#include "projal.h"
|
|
|
|
-#include "mqttclient.h"
|
|
|
|
-#include "mqttcfg.h"
|
|
|
|
-#include "mqttdbg.h"
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
-#if _ENABLE_MEM_TRACE
|
|
|
|
-#include <mcheck.h>
|
|
|
|
-class CMtrace
|
|
|
|
-{
|
|
|
|
-public:
|
|
|
|
- CMtrace(void) {
|
|
|
|
- putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
|
|
|
|
- mtrace();
|
|
|
|
- }
|
|
|
|
- ~CMtrace(void){
|
|
|
|
-// muntrace();
|
|
|
|
- }
|
|
|
|
-};
|
|
|
|
-#endif // _ENABLE_MEM_TRACE
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-#if _TRACK_TIMES
|
|
|
|
-static CProcessClock g_pc;
|
|
|
|
-unsigned long g_nDbgCounter1 = 0;
|
|
|
|
-unsigned long g_nDbgCounter2 = 0;
|
|
|
|
-unsigned long g_nDbgCounter3 = 0;
|
|
|
|
-#endif // _TRACK_TIMES
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-// app control
|
|
|
|
-
|
|
|
|
-#define _APPID GFA_APPCTRL_APPID_MQTTCL
|
|
|
|
-#define _APPNAME "MqttCl"
|
|
|
|
-#define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-#define _UPDATE_INTERVAL_MS 100
|
|
|
|
-#define _RECONN_INTERVAL_MS 1000
|
|
|
|
-#define _LOGFILE_NAME "mqttcl.log"
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
-#define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
|
|
|
|
-#define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
|
|
|
|
-
|
|
|
|
-#define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
|
|
|
|
-#define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
|
|
|
|
-
|
|
|
|
-#define _TOPIC_CTRL_KEY_BINLE "binLe"
|
|
|
|
-#define _TOPIC_CTRL_KEY_BINBE "binBe"
|
|
|
|
-#define _TOPIC_CTRL_KEY_JSON "json"
|
|
|
|
-#define _TOPIC_CTRL_KEY_PBUF "pBuf"
|
|
|
|
-#define _TOPIC_CTRL_KEY_QOS "qos"
|
|
|
|
-#define _TOPIC_CTRL_KEY_RETAIN "retained"
|
|
|
|
-#define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
|
|
|
|
-#define _TOPIC_CMD_CTRL "CONTROL"
|
|
|
|
-#define _TOPIC_CMD_SET "SET"
|
|
|
|
-#define _TOPIC_CMD_STATUS "STATUS"
|
|
|
|
-
|
|
|
|
-typedef enum _TOPIC_CTRL_CMD
|
|
|
|
-{
|
|
|
|
- TCC_Control,
|
|
|
|
- TCC_SetBinLe,
|
|
|
|
- TCC_SetBinBe,
|
|
|
|
- TCC_SetJson,
|
|
|
|
- TCC_SetPBuf,
|
|
|
|
- TCC_Status
|
|
|
|
-}TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
|
|
|
|
-typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
|
|
|
|
-{
|
|
|
|
- CLS_NotInit,
|
|
|
|
- CLS_SetTLS,
|
|
|
|
- CLS_Unconnected,
|
|
|
|
- CLS_Connect,
|
|
|
|
- CLS_Reconnect,
|
|
|
|
- CLS_Connecting,
|
|
|
|
- CLS_Connected,
|
|
|
|
- CLS_Subscribe,
|
|
|
|
- CLS_Subscribing,
|
|
|
|
- CLS_Subscribed,
|
|
|
|
- CLS_ProcMsg,
|
|
|
|
- CLS_ShutDown,
|
|
|
|
- CLS_Err,
|
|
|
|
- CLS_Unsubscribe,
|
|
|
|
- CLS_Disconnect,
|
|
|
|
- CLS_Cleanup,
|
|
|
|
- CLS_Paused,
|
|
|
|
- CLS_Exit
|
|
|
|
-}MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
|
|
|
|
-typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
|
|
|
|
-
|
|
|
|
-static const char *g_pszStateNames[] =
|
|
|
|
-{
|
|
|
|
- "Not Init",
|
|
|
|
- "Set TLS",
|
|
|
|
- "Unconnected",
|
|
|
|
- "Connect",
|
|
|
|
- "Reconnect",
|
|
|
|
- "Connecting",
|
|
|
|
- "Connected",
|
|
|
|
- "Subscribe",
|
|
|
|
- "Subscribing",
|
|
|
|
- "Subscribed",
|
|
|
|
- "ProcMsg",
|
|
|
|
- "ShutDown",
|
|
|
|
- "Error",
|
|
|
|
- "Unsubscribe",
|
|
|
|
- "Disconnect",
|
|
|
|
- "Cleanup",
|
|
|
|
- "Paused",
|
|
|
|
- "Exit"
|
|
|
|
-};
|
|
|
|
-
|
|
|
|
-static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
|
|
|
|
-{
|
|
|
|
- if(cs >= CLS_NotInit && cs <= CLS_Exit)
|
|
|
|
- return g_pszStateNames[cs];
|
|
|
|
- return "";
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-typedef struct _SUB_CTRL_TOPIC
|
|
|
|
-{
|
|
|
|
- _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
|
|
|
|
- {
|
|
|
|
- this->nVarOffset = s.length() - 2;
|
|
|
|
- }
|
|
|
|
- std::string sTopic;
|
|
|
|
- size_t nVarOffset;
|
|
|
|
-}SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
|
|
|
|
-typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-//static std::string _CreateDeviceID(void);
|
|
|
|
-
|
|
|
|
-static volatile bool g_fRun = false;
|
|
|
|
-static volatile bool g_fPauseImp = false;
|
|
|
|
-static volatile bool g_fPauseCmd = false;
|
|
|
|
-static volatile bool g_fZombie = false;
|
|
|
|
-static appid_t g_nDepRunning = 0;
|
|
|
|
-static sigset_t g_set;
|
|
|
|
-static CLogfile g_lf;
|
|
|
|
-static int g_nLastSig = -1;
|
|
|
|
-static shm_t g_shmShadow;
|
|
|
|
-static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
|
|
|
|
-static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
|
|
|
|
-static bool g_bConnected = false;
|
|
|
|
-static int g_nSubcribed = 0;
|
|
|
|
-static bool g_bIntr = false;
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static const char* _GetBaseDir(std::string &rstrBaseDir)
|
|
|
|
-{
|
|
|
|
- char szBaseDir[PATH_MAX];
|
|
|
|
- rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
|
|
|
|
- rtrim(rstrBaseDir, "/");
|
|
|
|
- return rstrBaseDir.c_str();
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static void _SigHandler(int sig)
|
|
|
|
-{
|
|
|
|
- g_nLastSig = sig;
|
|
|
|
- g_bIntr = true;
|
|
|
|
- g_fPauseImp = g_fPauseCmd = g_fZombie = false;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
|
|
|
|
-{
|
|
|
|
- if(usleep(t) < 0 && errno == EINTR)
|
|
|
|
- return CLS_ShutDown;
|
|
|
|
- return csNext;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
|
|
|
|
-{
|
|
|
|
- CMqttVar *pVar;
|
|
|
|
- CMqttMessage *pMsg;
|
|
|
|
- std::string sVarPath;
|
|
|
|
-
|
|
|
|
- while((pMsg = rcl.PopRcvMsg()))
|
|
|
|
- {
|
|
|
|
- if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
|
|
|
|
- {
|
|
|
|
- sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
|
|
|
|
-
|
|
|
|
- if((pVar = vt.Find(sVarPath.c_str())))
|
|
|
|
- {
|
|
|
|
- bool bChanged = false;
|
|
|
|
- int nType, nQos;
|
|
|
|
- CJson_t jtRoot, jtVal;
|
|
|
|
- std::string err;
|
|
|
|
- uint32_t nMaskOn = 0, nMaskOff = 0;
|
|
|
|
-
|
|
|
|
- if(pMsg->GetPayloadAsJSON(jtRoot, err))
|
|
|
|
- {
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- bChanged = pVar->SetRetained(true) || bChanged;
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- bChanged = pVar->SetRetained(false) || bChanged;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true);
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_INTEGER:
|
|
|
|
- nQos = (int)json_integer_value(jtVal);
|
|
|
|
- if(nQos < MQTTCL_MIN_QOS)
|
|
|
|
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
|
|
|
|
- else if(nQos > MQTTCL_MAX_QOS)
|
|
|
|
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
|
|
|
|
- bChanged = pVar->SetQoS(nQos) || bChanged;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- nMaskOn |= MQTT_VALUE_BINLE;
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- nMaskOff |= MQTT_VALUE_BINLE;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- nMaskOn |= MQTT_VALUE_BINBE;
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- nMaskOff |= MQTT_VALUE_BINBE;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- nMaskOn |= MQTT_VALUE_JSON;
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- nMaskOff |= MQTT_VALUE_JSON;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
|
|
|
|
- {
|
|
|
|
- switch((nType = jtVal.Type()))
|
|
|
|
- {
|
|
|
|
- case JSON_TRUE:
|
|
|
|
- nMaskOn |= MQTT_VALUE_PBUF;
|
|
|
|
- break;
|
|
|
|
- case JSON_FALSE:
|
|
|
|
- nMaskOff |= MQTT_VALUE_PBUF;
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(nMaskOff)
|
|
|
|
- {
|
|
|
|
- if(pVar->DisablePublish(nMaskOff, &vt))
|
|
|
|
- {
|
|
|
|
- bChanged = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(nMaskOn)
|
|
|
|
- {
|
|
|
|
- if(pVar->EnablePublish(nMaskOn, &vt))
|
|
|
|
- {
|
|
|
|
- bChanged = true;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
-#if _DUMP_ENABLED_VARS
|
|
|
|
- if(bChanged)
|
|
|
|
- vt.DumpPubEnabled();
|
|
|
|
-#endif // _DUMP_ENABLED_VARS
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- int nLocks = 0;
|
|
|
|
- static const uint32_t nFormats[] =
|
|
|
|
- {
|
|
|
|
- 0,
|
|
|
|
- MQTT_VALUE_BINLE,
|
|
|
|
- MQTT_VALUE_BINBE,
|
|
|
|
- MQTT_VALUE_JSON,
|
|
|
|
- MQTT_VALUE_PBUF
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
|
|
|
|
- {
|
|
|
|
- if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
|
|
|
|
- {
|
|
|
|
- sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
|
|
|
|
-
|
|
|
|
- if((pVar = vt.Find(sVarPath.c_str())))
|
|
|
|
- {
|
|
|
|
- if(pVar->PublishEnabled())
|
|
|
|
- {
|
|
|
|
- if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
|
|
|
|
- {
|
|
|
|
- // !!!
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- // !!!
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- // !!!
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pMsg->Release();
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static int _ProcessOutgoing(CMqttClient &rcl)
|
|
|
|
-{
|
|
|
|
- int nRet = 0;
|
|
|
|
- CMqttMessage *pMsg;
|
|
|
|
-
|
|
|
|
- while((pMsg = rcl.PopSndMsg()))
|
|
|
|
- {
|
|
|
|
- rcl.publish(pMsg);
|
|
|
|
- pMsg->Release();
|
|
|
|
- ++nRet;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return nRet;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
|
|
|
|
-{
|
|
|
|
- int nRet;
|
|
|
|
-
|
|
|
|
- for(size_t i = 0; i < nLenMap; i++)
|
|
|
|
- {
|
|
|
|
- if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
|
|
|
|
- break;
|
|
|
|
- TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return nRet;
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
|
|
|
|
-{
|
|
|
|
- for(size_t i = 0; i < nLenMap; i++)
|
|
|
|
- {
|
|
|
|
- rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
|
|
|
|
-{
|
|
|
|
- switch(pntf->evt)
|
|
|
|
- {
|
|
|
|
- case NEVT_Log:
|
|
|
|
- if(pntf->log.level == MOSQ_LOG_ERR)
|
|
|
|
- {
|
|
|
|
- TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
|
|
|
|
- g_lf.Error("%s\n", pntf->log.str);
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case NEVT_Connect:
|
|
|
|
- if(pntf->con.rc == MOSQ_ERR_SUCCESS)
|
|
|
|
- g_bConnected = true;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case NEVT_Disconnect:
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case NEVT_Subscribe:
|
|
|
|
- ++g_nSubcribed;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case NEVT_Unsubscribe:
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- default:
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
-static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
|
|
|
|
-{
|
|
|
|
- ctrlmsg_t nCtrlMsg;
|
|
|
|
-
|
|
|
|
- while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
|
|
|
|
- {
|
|
|
|
- switch(nCtrlMsg)
|
|
|
|
- {
|
|
|
|
- case GFA_APPCTRL_CTRLMSG_STOP:
|
|
|
|
- g_bIntr = true;
|
|
|
|
- g_fPauseImp = false;
|
|
|
|
- g_fPauseCmd = false;
|
|
|
|
- g_fZombie = false;
|
|
|
|
- g_lf.Info("Received Control Message 'Stop'\n");
|
|
|
|
- break;
|
|
|
|
- case GFA_APPCTRL_CTRLMSG_PAUSE:
|
|
|
|
- if(!g_fPauseCmd)
|
|
|
|
- {
|
|
|
|
- g_fPauseCmd = true;
|
|
|
|
- if(!g_fPauseImp)
|
|
|
|
- {
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
|
|
|
|
- g_lf.Info("Received Control Message 'Pause'\n");
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
|
|
|
|
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- case GFA_APPCTRL_CTRLMSG_RESUME:
|
|
|
|
- if(g_fPauseCmd)
|
|
|
|
- {
|
|
|
|
- g_fPauseCmd = false;
|
|
|
|
- if(!g_fPauseImp)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Received Control Message 'Resume'\n");
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
|
|
|
|
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
-static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
|
|
|
|
-{
|
|
|
|
- appid_t nAppIdSrc;
|
|
|
|
- bool fOldPaused = g_fPauseImp;
|
|
|
|
- char szDispName[128];
|
|
|
|
-
|
|
|
|
- while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
|
|
|
|
- {
|
|
|
|
- GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
|
|
|
|
- GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
|
|
|
|
- TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
|
-
|
|
|
|
- if(nAppIdSrc & _DEPENDENCIES)
|
|
|
|
- {
|
|
|
|
- if(state == GIAS_Running)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
|
- g_nDepRunning |= nAppIdSrc;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
|
- g_nDepRunning &= ~nAppIdSrc;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(!g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
|
|
|
|
-
|
|
|
|
- if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
|
|
|
|
- {
|
|
|
|
- fOldPaused = g_fPauseImp;
|
|
|
|
- GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, newState);
|
|
|
|
- if(g_fPauseImp)
|
|
|
|
- g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
|
|
|
|
- else
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-//
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-int main(int /*argc*/, char **/*argv*/)
|
|
|
|
-{
|
|
|
|
- int nRet = 0;
|
|
|
|
- CProcessInstance pi;
|
|
|
|
- std::string sDevID;
|
|
|
|
- char szLogFile[PATH_MAX];
|
|
|
|
- std::string strBaseDir;
|
|
|
|
- const char *pszBaseDir = NULL;
|
|
|
|
- HAPPCTRL hAC = NULL;
|
|
|
|
- HAPPINFO hAI;
|
|
|
|
- HSHM hShm = NULL;
|
|
|
|
- void *pShm = NULL;
|
|
|
|
- unsigned long long nUsecWorkTime = 0;
|
|
|
|
- int nTlsMode;
|
|
|
|
- CProcessClock pcWork;
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // check for multiple instances
|
|
|
|
-
|
|
|
|
- if(!pi.LockInstance(UUID_SHM))
|
|
|
|
- {
|
|
|
|
- CLogfile::StdErr("Failed to start instance!\n");
|
|
|
|
- return -1;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // configure signal handling
|
|
|
|
-
|
|
|
|
- struct sigaction sa;
|
|
|
|
- ::sigfillset(&g_set);
|
|
|
|
- sigaddset(&g_set, SIGUSR1);
|
|
|
|
- memset(&sa, 0, sizeof(sa));
|
|
|
|
-
|
|
|
|
- sa.sa_handler = _SigHandler;
|
|
|
|
- sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
|
|
|
|
- sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
|
|
|
|
- sigaction(SIGTERM, &sa, NULL); // handles normal termination
|
|
|
|
- sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
|
|
|
|
- sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
|
|
|
|
-
|
|
|
|
- sa.sa_handler = SIG_IGN;
|
|
|
|
- sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
|
|
|
|
- sigaction(SIGSTOP, &sa, NULL); // ignores Stop
|
|
|
|
- sigaction(SIGCONT, &sa, NULL); // ignores Continue
|
|
|
|
- sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
|
|
|
|
- sigaction(0, &sa, NULL); // ignores shell termination
|
|
|
|
-
|
|
|
|
- do
|
|
|
|
- {
|
|
|
|
- g_fZombie = true;
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // get the base directory for output files
|
|
|
|
-
|
|
|
|
- if(!pszBaseDir)
|
|
|
|
- pszBaseDir = _GetBaseDir(strBaseDir);
|
|
|
|
-
|
|
|
|
- CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // initialize log file
|
|
|
|
-
|
|
|
|
- sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
|
|
|
|
-
|
|
|
|
- if(!g_lf.Open(szLogFile))
|
|
|
|
- {
|
|
|
|
- CLogfile::StdErr("Failed to create/open log file!\n");
|
|
|
|
- nRet = -1;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- g_lf.Info("Process started.\n");
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // initialize app control
|
|
|
|
-
|
|
|
|
- g_lf.Info("Acquire AppCtrl-Handle.\n");
|
|
|
|
-
|
|
|
|
- if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
|
|
|
|
- {
|
|
|
|
- g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
|
|
|
|
-
|
|
|
|
- if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
|
|
|
|
- {
|
|
|
|
- g_lf.Error("Failed to subscribe state event notifications!\n");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // parse the config file
|
|
|
|
-
|
|
|
|
- CMqttClConfig cfg(UUID_SHM);
|
|
|
|
-
|
|
|
|
- if(!cfg.LoadCfg(MQTTCL_CONFIG_FILE_PATH, g_lf))
|
|
|
|
- {
|
|
|
|
- nRet = -1;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- nTlsMode = cfg.GetTLSMode();
|
|
|
|
-// TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // client control topic map
|
|
|
|
-
|
|
|
|
- const SUB_CTRL_TOPIC subCtrlMap[] =
|
|
|
|
- {
|
|
|
|
- formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL),
|
|
|
|
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
|
|
|
|
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
|
|
|
|
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
|
|
|
|
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
|
|
|
|
-// formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS)
|
|
|
|
- };
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
- if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
|
|
|
|
- {
|
|
|
|
- g_lf.Error("GfaIpcAcquireSHM failed!\n");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- g_lf.Info("Acquired SHM Handle.\n");
|
|
|
|
-
|
|
|
|
- if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
|
|
|
|
- {
|
|
|
|
- g_lf.Error("GfaIpcAcquirePointer failed!\n");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- g_lf.Info("Acquired SHM Pointer.\n");
|
|
|
|
- ::GfaIpcDumpSHMROT();
|
|
|
|
-
|
|
|
|
- memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
- int nErr, nLocked = 0, nNumConn = 0;
|
|
|
|
- bool bReconnect, bConnPending;
|
|
|
|
- std::string strErr;
|
|
|
|
- CMqttVarTable vtbl;
|
|
|
|
- CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
|
|
|
|
- shm.InitPath(NULL, NULL);
|
|
|
|
- shm.CreateMembersTable(vtbl);
|
|
|
|
-
|
|
|
|
-#if _DUMP_ENABLED_VARS
|
|
|
|
- vtbl.DumpPubEnabled();
|
|
|
|
-#endif // _DUMP_ENABLED_VARS
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
- CMqttClient mqttCl(cfg.GetDeviceID());
|
|
|
|
- mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
|
|
|
|
-
|
|
|
|
- g_fZombie = false;
|
|
|
|
- g_fRun = true;
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
|
-
|
|
|
|
- while(g_fRun)
|
|
|
|
- {
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
- // update app control info
|
|
|
|
-
|
|
|
|
- if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
|
|
|
|
- {
|
|
|
|
- _ProcessCtrlMessages(hAC, hAI);
|
|
|
|
- _ProcessStateEvents(hAC, hAI);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(g_fPauseImp || g_fPauseCmd)
|
|
|
|
- {
|
|
|
|
- if(g_cs < CLS_ShutDown)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- nUsecWorkTime = 0;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- pcWork.ClockTrigger();
|
|
|
|
-
|
|
|
|
- switch(g_cs)
|
|
|
|
- {
|
|
|
|
- case CLS_NotInit:
|
|
|
|
- if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_SetTLS;
|
|
|
|
- }
|
|
|
|
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_SetTLS:
|
|
|
|
- if(nTlsMode == MQTTCL_TLS_MODE_CRT)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Using TLS with certificates.\n");
|
|
|
|
-
|
|
|
|
- if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
|
|
|
|
- {
|
|
|
|
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
|
|
|
|
-
|
|
|
|
- if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Using TLS with PSK.\n");
|
|
|
|
-
|
|
|
|
- if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
|
|
|
|
- {
|
|
|
|
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
|
|
|
|
-
|
|
|
|
- if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Unconnected:
|
|
|
|
- if(g_bConnected)
|
|
|
|
- {
|
|
|
|
- g_bConnected = false;
|
|
|
|
- g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- }
|
|
|
|
- if(!nNumConn)
|
|
|
|
- g_cs = CLS_Connect;
|
|
|
|
- else
|
|
|
|
- g_cs = CLS_Reconnect;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Connect:
|
|
|
|
- if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- if(bConnPending)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(bReconnect)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
|
|
|
|
- }
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Reconnect:
|
|
|
|
- if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- if(bConnPending)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(bReconnect)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
|
|
|
|
- }
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Connecting:
|
|
|
|
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- if(bReconnect)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
|
|
|
|
- }
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else if(!bConnPending)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else if(g_bConnected)
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_Connected;
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Connected:
|
|
|
|
- g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- ++nNumConn;
|
|
|
|
- g_nSubcribed = 0;
|
|
|
|
- g_cs = CLS_Subscribe;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Subscribe:
|
|
|
|
- g_lf.Info("Subscribing control-topics ...\n");
|
|
|
|
- if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
|
|
|
|
- g_cs = CLS_Subscribing;
|
|
|
|
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- if(bConnPending)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(bReconnect)
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Subscribing:
|
|
|
|
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
- if(bReconnect)
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- else if(bConnPending)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- else if(g_nSubcribed == _COUNTOF(subCtrlMap))
|
|
|
|
- {
|
|
|
|
- g_cs = CLS_Subscribed;
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Subscribed:
|
|
|
|
- g_lf.Info("Subscriptions acknowledged.\n");
|
|
|
|
- g_lf.Info("Enter SHM processing loop ...\n");
|
|
|
|
- g_cs = CLS_ProcMsg;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_ProcMsg:
|
|
|
|
- if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
|
- {
|
|
|
|
-#if _TRACK_TIMES
|
|
|
|
- std::string s1, s2;
|
|
|
|
- pc_time64_t elapsed;
|
|
|
|
- g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
|
|
|
|
- g_pc.ClockTrigger();
|
|
|
|
-#endif // _TRACK_TIMES
|
|
|
|
-
|
|
|
|
- _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
|
|
|
|
-
|
|
|
|
-#if _TRACK_TIMES
|
|
|
|
- if(g_nDbgCounter1)
|
|
|
|
- {
|
|
|
|
- elapsed = g_pc.ClockGetElapsed();
|
|
|
|
- s1 = CProcessClock::Interval2String(elapsed);
|
|
|
|
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
|
|
|
|
- TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
|
|
|
|
- }
|
|
|
|
- g_pc.ClockTrigger();
|
|
|
|
-#endif // _TRACK_TIMES
|
|
|
|
-
|
|
|
|
- _SIG_BLOCK(&g_set);
|
|
|
|
- vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked);
|
|
|
|
- _SIG_UNBLOCK(&g_set);
|
|
|
|
-#if _TRACK_TIMES
|
|
|
|
- g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
|
|
|
|
- if(g_nDbgCounter2)
|
|
|
|
- {
|
|
|
|
- elapsed = g_pc.ClockGetElapsed();
|
|
|
|
- s1 = CProcessClock::Interval2String(elapsed);
|
|
|
|
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
|
|
|
|
- TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
|
|
|
|
- g_pc.ClockTrigger();
|
|
|
|
- }
|
|
|
|
- g_nDbgCounter3 =
|
|
|
|
-#endif // _TRACK_TIMES
|
|
|
|
-
|
|
|
|
- _ProcessOutgoing(mqttCl);
|
|
|
|
-
|
|
|
|
-#if _TRACK_TIMES
|
|
|
|
- if(g_nDbgCounter3)
|
|
|
|
- {
|
|
|
|
- elapsed = g_pc.ClockGetElapsed();
|
|
|
|
- s1 = CProcessClock::Interval2String(elapsed);
|
|
|
|
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
|
|
|
|
- TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
|
|
|
|
- }
|
|
|
|
-#endif // _TRACK_TIMES
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- if(bReconnect)
|
|
|
|
- g_cs = CLS_Unconnected;
|
|
|
|
- else if(bConnPending)
|
|
|
|
- g_cs = CLS_Connecting;
|
|
|
|
- else if(g_bIntr)
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- g_csLast = g_cs;
|
|
|
|
- g_cs = CLS_Err;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Err:
|
|
|
|
- g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
|
|
|
|
- TRACE("Error: %s\n", strErr.c_str());
|
|
|
|
- g_fZombie = true;
|
|
|
|
- g_cs = CLS_ShutDown;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_ShutDown:
|
|
|
|
- if(g_bIntr && g_nLastSig >= 0)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
|
|
|
|
- g_nLastSig = -1;
|
|
|
|
- }
|
|
|
|
- g_lf.Info("Process shutting down ...\n");
|
|
|
|
- if(g_csLast >= CLS_Subscribed)
|
|
|
|
- g_cs = CLS_Unsubscribe;
|
|
|
|
- else if(g_csLast >= CLS_Connected)
|
|
|
|
- g_cs = CLS_Disconnect;
|
|
|
|
- else if(g_csLast > CLS_NotInit)
|
|
|
|
- g_cs = CLS_Cleanup;
|
|
|
|
- else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
|
|
|
|
- g_cs = CLS_Paused;
|
|
|
|
- else
|
|
|
|
- g_cs = CLS_Exit;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Unsubscribe:
|
|
|
|
- g_lf.Info("Unsubscribe control-topics.\n");
|
|
|
|
- _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
|
|
|
|
- g_nSubcribed = 0;
|
|
|
|
- g_cs = CLS_Disconnect;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Disconnect:
|
|
|
|
- g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
|
- mqttCl.disconnect();
|
|
|
|
- g_bConnected = false;
|
|
|
|
- g_cs = CLS_Cleanup;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Cleanup:
|
|
|
|
- g_lf.Info("Clean up.\n");
|
|
|
|
- CMqttClient::Cleanup();
|
|
|
|
- if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
|
|
|
|
- g_cs = CLS_Paused;
|
|
|
|
- else
|
|
|
|
- g_cs = CLS_Exit;
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Paused:
|
|
|
|
- if(!g_fPauseImp && !g_fPauseCmd)
|
|
|
|
- {
|
|
|
|
- if(!g_bIntr)
|
|
|
|
- g_cs = CLS_NotInit;
|
|
|
|
- else
|
|
|
|
- g_cs = CLS_Exit;
|
|
|
|
- }
|
|
|
|
- else
|
|
|
|
- {
|
|
|
|
- usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
-
|
|
|
|
- case CLS_Exit:
|
|
|
|
- g_fRun = false;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- while(false);
|
|
|
|
-
|
|
|
|
- ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
|
-
|
|
|
|
- if(hShm)
|
|
|
|
- {
|
|
|
|
- if(pShm)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Release SHM Pointer.\n");
|
|
|
|
- ::GfaIpcReleasePointer(hShm, pShm);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- g_lf.Info("Release SHM Handle.\n");
|
|
|
|
- ::GfaIpcReleaseSHM(hShm);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(g_fZombie)
|
|
|
|
- {
|
|
|
|
- if(hAC)
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
|
|
|
|
- TRACE("Enter Zombie state ...\n");
|
|
|
|
- g_lf.Warning("Enter Zombie state ...\n");
|
|
|
|
- g_lf.Flush();
|
|
|
|
- pause();
|
|
|
|
-
|
|
|
|
- if(g_nLastSig >= 0)
|
|
|
|
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if(hAC)
|
|
|
|
- {
|
|
|
|
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
|
|
|
|
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
|
|
|
|
- g_lf.Info("Releasing App Control ...\n");
|
|
|
|
- ::GfaIpcAppCtrlRelease(hAC);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- g_lf.Info("Process exit.\n\n");
|
|
|
|
- g_lf.Close();
|
|
|
|
- CLogfile::StdErr("MqttCl exit.\n");
|
|
|
|
- return nRet;
|
|
|
|
-}
|
|
|