|
@@ -0,0 +1,1202 @@
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+#include <stdio.h>
|
|
|
+#include <signal.h>
|
|
|
+#include <linux/limits.h>
|
|
|
+#include <sys/socket.h>
|
|
|
+#include <arpa/inet.h>
|
|
|
+#include <linux/if_arp.h>
|
|
|
+#include <netinet/in.h>
|
|
|
+#include <ifaddrs.h>
|
|
|
+#include <sys/ioctl.h>
|
|
|
+#include <fcntl.h>
|
|
|
+#include <unistd.h>
|
|
|
+#include <getopt.h>
|
|
|
+#include <gfa/svc/common/strutil.h>
|
|
|
+#include <gfa/svc/common/fileutil.h>
|
|
|
+#include <gfa/svc/common/instance.h>
|
|
|
+#include <gfa/svc/common/processclock.h>
|
|
|
+#include <gfa/svc/common/debug.h>
|
|
|
+#include <gfa/svc/mqttcl/mqttclient.h>
|
|
|
+#include <gfa/svc/mqttcl/mqttcfg.h>
|
|
|
+#include <gfa/svc/mqttcl/mqttdbg.h>
|
|
|
+#include "projal.h"
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+#if _ENABLE_MEM_TRACE
|
|
|
+#include <mcheck.h>
|
|
|
+class CMtrace
|
|
|
+{
|
|
|
+public:
|
|
|
+ CMtrace(void) {
|
|
|
+ putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
|
|
|
+ mtrace();
|
|
|
+ }
|
|
|
+ ~CMtrace(void){
|
|
|
+// muntrace();
|
|
|
+ }
|
|
|
+};
|
|
|
+#endif // _ENABLE_MEM_TRACE
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#if _TRACK_TIMES
|
|
|
+static CProcessClock g_pc;
|
|
|
+unsigned long g_nDbgCounter1 = 0;
|
|
|
+unsigned long g_nDbgCounter2 = 0;
|
|
|
+unsigned long g_nDbgCounter3 = 0;
|
|
|
+#endif // _TRACK_TIMES
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+// app control
|
|
|
+
|
|
|
+#define _APPID GFA_APPCTRL_APPID_MQTTCL
|
|
|
+#define _APPNAME "MqttCl"
|
|
|
+#define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+#define _UPDATE_INTERVAL_MS 100
|
|
|
+#define _RECONN_INTERVAL_MS 1000
|
|
|
+#define _LOGFILE_NAME "mqttcl.log"
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+#define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
|
|
|
+#define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
|
|
|
+
|
|
|
+#define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
|
|
|
+#define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
|
|
|
+
|
|
|
+#define _TOPIC_CTRL_KEY_BINLE "binLe"
|
|
|
+#define _TOPIC_CTRL_KEY_BINBE "binBe"
|
|
|
+#define _TOPIC_CTRL_KEY_JSON "json"
|
|
|
+#define _TOPIC_CTRL_KEY_PBUF "pBuf"
|
|
|
+#define _TOPIC_CTRL_KEY_QOS "qos"
|
|
|
+#define _TOPIC_CTRL_KEY_RETAIN "retained"
|
|
|
+#define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
|
|
|
+#define _TOPIC_CMD_CTRL "CONTROL"
|
|
|
+#define _TOPIC_CMD_SET "SET"
|
|
|
+#define _TOPIC_CMD_STATUS "STATUS"
|
|
|
+
|
|
|
+typedef enum _TOPIC_CTRL_CMD
|
|
|
+{
|
|
|
+ TCC_Control,
|
|
|
+ TCC_SetBinLe,
|
|
|
+ TCC_SetBinBe,
|
|
|
+ TCC_SetJson,
|
|
|
+ TCC_SetPBuf,
|
|
|
+ TCC_Status
|
|
|
+}TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
|
|
|
+typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
|
|
|
+{
|
|
|
+ CLS_NotInit,
|
|
|
+ CLS_SetTLS,
|
|
|
+ CLS_Unconnected,
|
|
|
+ CLS_Connect,
|
|
|
+ CLS_Reconnect,
|
|
|
+ CLS_Connecting,
|
|
|
+ CLS_Connected,
|
|
|
+ CLS_Subscribe,
|
|
|
+ CLS_Subscribing,
|
|
|
+ CLS_Subscribed,
|
|
|
+ CLS_ProcMsg,
|
|
|
+ CLS_ShutDown,
|
|
|
+ CLS_Err,
|
|
|
+ CLS_Unsubscribe,
|
|
|
+ CLS_Disconnect,
|
|
|
+ CLS_Cleanup,
|
|
|
+ CLS_Paused,
|
|
|
+ CLS_Exit
|
|
|
+}MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
|
|
|
+typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
|
|
|
+
|
|
|
+static const char *g_pszStateNames[] =
|
|
|
+{
|
|
|
+ "Not Init",
|
|
|
+ "Set TLS",
|
|
|
+ "Unconnected",
|
|
|
+ "Connect",
|
|
|
+ "Reconnect",
|
|
|
+ "Connecting",
|
|
|
+ "Connected",
|
|
|
+ "Subscribe",
|
|
|
+ "Subscribing",
|
|
|
+ "Subscribed",
|
|
|
+ "ProcMsg",
|
|
|
+ "ShutDown",
|
|
|
+ "Error",
|
|
|
+ "Unsubscribe",
|
|
|
+ "Disconnect",
|
|
|
+ "Cleanup",
|
|
|
+ "Paused",
|
|
|
+ "Exit"
|
|
|
+};
|
|
|
+
|
|
|
+static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
|
|
|
+{
|
|
|
+ if(cs >= CLS_NotInit && cs <= CLS_Exit)
|
|
|
+ return g_pszStateNames[cs];
|
|
|
+ return "";
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+typedef struct _SUB_CTRL_TOPIC
|
|
|
+{
|
|
|
+ _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
|
|
|
+ {
|
|
|
+ this->nVarOffset = s.length() - 2;
|
|
|
+ }
|
|
|
+ std::string sTopic;
|
|
|
+ size_t nVarOffset;
|
|
|
+}SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
|
|
|
+typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+//static std::string _CreateDeviceID(void);
|
|
|
+
|
|
|
+static volatile bool g_fRun = false;
|
|
|
+static volatile bool g_fPauseImp = false;
|
|
|
+static volatile bool g_fPauseCmd = false;
|
|
|
+static volatile bool g_fZombie = false;
|
|
|
+static appid_t g_nDepRunning = 0;
|
|
|
+static sigset_t g_set;
|
|
|
+static CLogfile g_lf;
|
|
|
+static int g_nLastSig = -1;
|
|
|
+static shm_t g_shmShadow;
|
|
|
+static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
|
|
|
+static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
|
|
|
+static bool g_bConnected = false;
|
|
|
+static int g_nSubcribed = 0;
|
|
|
+static bool g_bIntr = false;
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static const char* _GetBaseDir(std::string &rstrBaseDir)
|
|
|
+{
|
|
|
+ char szBaseDir[PATH_MAX];
|
|
|
+ rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
|
|
|
+ rtrim(rstrBaseDir, "/");
|
|
|
+ return rstrBaseDir.c_str();
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static void _SigHandler(int sig)
|
|
|
+{
|
|
|
+ g_nLastSig = sig;
|
|
|
+ g_bIntr = true;
|
|
|
+ g_fPauseImp = g_fPauseCmd = g_fZombie = false;
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
|
|
|
+{
|
|
|
+ if(usleep(t) < 0 && errno == EINTR)
|
|
|
+ return CLS_ShutDown;
|
|
|
+ return csNext;
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
|
|
|
+{
|
|
|
+ CMqttVar *pVar;
|
|
|
+ CMqttMessage *pMsg;
|
|
|
+ std::string sVarPath;
|
|
|
+
|
|
|
+ while((pMsg = rcl.PopRcvMsg()))
|
|
|
+ {
|
|
|
+ if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
|
|
|
+ {
|
|
|
+ sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
|
|
|
+
|
|
|
+ if((pVar = vt.Find(sVarPath.c_str())))
|
|
|
+ {
|
|
|
+ bool bChanged = false;
|
|
|
+ int nType, nQos;
|
|
|
+ CJson_t jtRoot, jtVal;
|
|
|
+ std::string err;
|
|
|
+ uint32_t nMaskOn = 0, nMaskOff = 0;
|
|
|
+
|
|
|
+ if(pMsg->GetPayloadAsJSON(jtRoot, err))
|
|
|
+ {
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ bChanged = pVar->SetRetained(true) || bChanged;
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ bChanged = pVar->SetRetained(false) || bChanged;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true);
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_INTEGER:
|
|
|
+ nQos = (int)json_integer_value(jtVal);
|
|
|
+ if(nQos < MQTTCL_MIN_QOS)
|
|
|
+ g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
|
|
|
+ else if(nQos > MQTTCL_MAX_QOS)
|
|
|
+ g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
|
|
|
+ bChanged = pVar->SetQoS(nQos) || bChanged;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ nMaskOn |= MQTT_VALUE_BINLE;
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ nMaskOff |= MQTT_VALUE_BINLE;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ nMaskOn |= MQTT_VALUE_BINBE;
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ nMaskOff |= MQTT_VALUE_BINBE;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ nMaskOn |= MQTT_VALUE_JSON;
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ nMaskOff |= MQTT_VALUE_JSON;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
|
|
|
+ {
|
|
|
+ switch((nType = jtVal.Type()))
|
|
|
+ {
|
|
|
+ case JSON_TRUE:
|
|
|
+ nMaskOn |= MQTT_VALUE_PBUF;
|
|
|
+ break;
|
|
|
+ case JSON_FALSE:
|
|
|
+ nMaskOff |= MQTT_VALUE_PBUF;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(nMaskOff)
|
|
|
+ {
|
|
|
+ if(pVar->DisablePublish(nMaskOff, &vt))
|
|
|
+ {
|
|
|
+ bChanged = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(nMaskOn)
|
|
|
+ {
|
|
|
+ if(pVar->EnablePublish(nMaskOn, &vt))
|
|
|
+ {
|
|
|
+ bChanged = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+#if _DUMP_ENABLED_VARS
|
|
|
+ if(bChanged)
|
|
|
+ vt.DumpPubEnabled();
|
|
|
+#endif // _DUMP_ENABLED_VARS
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ int nLocks = 0;
|
|
|
+ static const uint32_t nFormats[] =
|
|
|
+ {
|
|
|
+ 0,
|
|
|
+ MQTT_VALUE_BINLE,
|
|
|
+ MQTT_VALUE_BINBE,
|
|
|
+ MQTT_VALUE_JSON,
|
|
|
+ MQTT_VALUE_PBUF
|
|
|
+ };
|
|
|
+
|
|
|
+ for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
|
|
|
+ {
|
|
|
+ if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
|
|
|
+ {
|
|
|
+ sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
|
|
|
+
|
|
|
+ if((pVar = vt.Find(sVarPath.c_str())))
|
|
|
+ {
|
|
|
+ if(pVar->PublishEnabled())
|
|
|
+ {
|
|
|
+ if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
|
|
|
+ {
|
|
|
+ // !!!
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // !!!
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ // !!!
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pMsg->Release();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static int _ProcessOutgoing(CMqttClient &rcl)
|
|
|
+{
|
|
|
+ int nRet = 0;
|
|
|
+ CMqttMessage *pMsg;
|
|
|
+
|
|
|
+ while((pMsg = rcl.PopSndMsg()))
|
|
|
+ {
|
|
|
+ rcl.publish(pMsg);
|
|
|
+ pMsg->Release();
|
|
|
+ ++nRet;
|
|
|
+ }
|
|
|
+
|
|
|
+ return nRet;
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
|
|
|
+{
|
|
|
+ int nRet;
|
|
|
+
|
|
|
+ for(size_t i = 0; i < nLenMap; i++)
|
|
|
+ {
|
|
|
+ if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
|
|
|
+ break;
|
|
|
+ TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
|
|
|
+ }
|
|
|
+
|
|
|
+ return nRet;
|
|
|
+}
|
|
|
+
|
|
|
+static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
|
|
|
+{
|
|
|
+ for(size_t i = 0; i < nLenMap; i++)
|
|
|
+ {
|
|
|
+ rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
|
|
|
+{
|
|
|
+ switch(pntf->evt)
|
|
|
+ {
|
|
|
+ case NEVT_Log:
|
|
|
+ if(pntf->log.level == MOSQ_LOG_ERR)
|
|
|
+ {
|
|
|
+ TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
|
|
|
+ g_lf.Error("%s\n", pntf->log.str);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case NEVT_Connect:
|
|
|
+ if(pntf->con.rc == MOSQ_ERR_SUCCESS)
|
|
|
+ g_bConnected = true;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case NEVT_Disconnect:
|
|
|
+ break;
|
|
|
+
|
|
|
+ case NEVT_Subscribe:
|
|
|
+ ++g_nSubcribed;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case NEVT_Unsubscribe:
|
|
|
+ break;
|
|
|
+
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
|
|
|
+{
|
|
|
+ ctrlmsg_t nCtrlMsg;
|
|
|
+
|
|
|
+ while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
|
|
|
+ {
|
|
|
+ switch(nCtrlMsg)
|
|
|
+ {
|
|
|
+ case GFA_APPCTRL_CTRLMSG_STOP:
|
|
|
+ g_bIntr = true;
|
|
|
+ g_fPauseImp = false;
|
|
|
+ g_fPauseCmd = false;
|
|
|
+ g_fZombie = false;
|
|
|
+ g_lf.Info("Received Control Message 'Stop'\n");
|
|
|
+ break;
|
|
|
+ case GFA_APPCTRL_CTRLMSG_PAUSE:
|
|
|
+ if(!g_fPauseCmd)
|
|
|
+ {
|
|
|
+ g_fPauseCmd = true;
|
|
|
+ if(!g_fPauseImp)
|
|
|
+ {
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
|
|
|
+ g_lf.Info("Received Control Message 'Pause'\n");
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
|
|
|
+ TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case GFA_APPCTRL_CTRLMSG_RESUME:
|
|
|
+ if(g_fPauseCmd)
|
|
|
+ {
|
|
|
+ g_fPauseCmd = false;
|
|
|
+ if(!g_fPauseImp)
|
|
|
+ {
|
|
|
+ g_lf.Info("Received Control Message 'Resume'\n");
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
|
|
|
+ TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
|
|
|
+{
|
|
|
+ appid_t nAppIdSrc;
|
|
|
+ bool fOldPaused = g_fPauseImp;
|
|
|
+ char szDispName[128];
|
|
|
+
|
|
|
+ while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
|
|
|
+ {
|
|
|
+ GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
|
|
|
+ GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
|
|
|
+ TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
+
|
|
|
+ if(nAppIdSrc & _DEPENDENCIES)
|
|
|
+ {
|
|
|
+ if(state == GIAS_Running)
|
|
|
+ {
|
|
|
+ g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
+ g_nDepRunning |= nAppIdSrc;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
|
|
|
+ g_nDepRunning &= ~nAppIdSrc;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!g_bIntr)
|
|
|
+ {
|
|
|
+ g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
|
|
|
+
|
|
|
+ if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
|
|
|
+ {
|
|
|
+ fOldPaused = g_fPauseImp;
|
|
|
+ GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, newState);
|
|
|
+ if(g_fPauseImp)
|
|
|
+ g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
|
|
|
+ else
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+////////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+//
|
|
|
+
|
|
|
+
|
|
|
+int main(int /*argc*/, char **/*argv*/)
|
|
|
+{
|
|
|
+ int nRet = 0;
|
|
|
+ CProcessInstance pi;
|
|
|
+ std::string sDevID;
|
|
|
+ char szLogFile[PATH_MAX];
|
|
|
+ std::string strBaseDir;
|
|
|
+ const char *pszBaseDir = NULL;
|
|
|
+ HAPPCTRL hAC = NULL;
|
|
|
+ HAPPINFO hAI;
|
|
|
+ HSHM hShm = NULL;
|
|
|
+ void *pShm = NULL;
|
|
|
+ unsigned long long nUsecWorkTime = 0;
|
|
|
+ int nTlsMode;
|
|
|
+ CProcessClock pcWork;
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // check for multiple instances
|
|
|
+
|
|
|
+ if(!pi.LockInstance(UUID_SHM))
|
|
|
+ {
|
|
|
+ CLogfile::StdErr("Failed to start instance!\n");
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // configure signal handling
|
|
|
+
|
|
|
+ struct sigaction sa;
|
|
|
+ ::sigfillset(&g_set);
|
|
|
+ sigaddset(&g_set, SIGUSR1);
|
|
|
+ memset(&sa, 0, sizeof(sa));
|
|
|
+
|
|
|
+ sa.sa_handler = _SigHandler;
|
|
|
+ sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
|
|
|
+ sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
|
|
|
+ sigaction(SIGTERM, &sa, NULL); // handles normal termination
|
|
|
+ sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
|
|
|
+ sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
|
|
|
+
|
|
|
+ sa.sa_handler = SIG_IGN;
|
|
|
+ sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
|
|
|
+ sigaction(SIGSTOP, &sa, NULL); // ignores Stop
|
|
|
+ sigaction(SIGCONT, &sa, NULL); // ignores Continue
|
|
|
+ sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
|
|
|
+ sigaction(0, &sa, NULL); // ignores shell termination
|
|
|
+
|
|
|
+ do
|
|
|
+ {
|
|
|
+ g_fZombie = true;
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // get the base directory for output files
|
|
|
+
|
|
|
+ if(!pszBaseDir)
|
|
|
+ pszBaseDir = _GetBaseDir(strBaseDir);
|
|
|
+
|
|
|
+ CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // initialize log file
|
|
|
+
|
|
|
+ sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
|
|
|
+
|
|
|
+ if(!g_lf.Open(szLogFile))
|
|
|
+ {
|
|
|
+ CLogfile::StdErr("Failed to create/open log file!\n");
|
|
|
+ nRet = -1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ g_lf.Info("Process started.\n");
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // initialize app control
|
|
|
+
|
|
|
+ g_lf.Info("Acquire AppCtrl-Handle.\n");
|
|
|
+
|
|
|
+ if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
|
|
|
+ {
|
|
|
+ g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
|
|
|
+
|
|
|
+ if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
|
|
|
+ {
|
|
|
+ g_lf.Error("Failed to subscribe state event notifications!\n");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // parse the config file
|
|
|
+
|
|
|
+ CMqttClConfig cfg(UUID_SHM);
|
|
|
+
|
|
|
+ if(!cfg.LoadCfg(MQTTCL_CONFIG_FILE_PATH, g_lf))
|
|
|
+ {
|
|
|
+ nRet = -1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ nTlsMode = cfg.GetTLSMode();
|
|
|
+// TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // client control topic map
|
|
|
+
|
|
|
+ const SUB_CTRL_TOPIC subCtrlMap[] =
|
|
|
+ {
|
|
|
+ formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL),
|
|
|
+ formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
|
|
|
+ formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
|
|
|
+ formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
|
|
|
+ formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
|
|
|
+// formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS)
|
|
|
+ };
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
|
|
|
+ {
|
|
|
+ g_lf.Error("GfaIpcAcquireSHM failed!\n");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ g_lf.Info("Acquired SHM Handle.\n");
|
|
|
+
|
|
|
+ if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
|
|
|
+ {
|
|
|
+ g_lf.Error("GfaIpcAcquirePointer failed!\n");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ g_lf.Info("Acquired SHM Pointer.\n");
|
|
|
+ ::GfaIpcDumpSHMROT();
|
|
|
+
|
|
|
+ memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ int nErr, nLocked = 0, nNumConn = 0;
|
|
|
+ bool bReconnect, bConnPending;
|
|
|
+ std::string strErr;
|
|
|
+ CMqttVarTable vtbl;
|
|
|
+ CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
|
|
|
+ shm.InitPath(NULL, NULL);
|
|
|
+ shm.CreateMembersTable(vtbl);
|
|
|
+
|
|
|
+#if _DUMP_ENABLED_VARS
|
|
|
+ vtbl.DumpPubEnabled();
|
|
|
+#endif // _DUMP_ENABLED_VARS
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ CMqttClient mqttCl(cfg.GetDeviceID());
|
|
|
+ mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
|
|
|
+
|
|
|
+ g_fZombie = false;
|
|
|
+ g_fRun = true;
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
|
|
|
+
|
|
|
+ while(g_fRun)
|
|
|
+ {
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+ // update app control info
|
|
|
+
|
|
|
+ if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
|
|
|
+ {
|
|
|
+ _ProcessCtrlMessages(hAC, hAI);
|
|
|
+ _ProcessStateEvents(hAC, hAI);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(g_fPauseImp || g_fPauseCmd)
|
|
|
+ {
|
|
|
+ if(g_cs < CLS_ShutDown)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ nUsecWorkTime = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ pcWork.ClockTrigger();
|
|
|
+
|
|
|
+ switch(g_cs)
|
|
|
+ {
|
|
|
+ case CLS_NotInit:
|
|
|
+ if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
|
|
|
+ {
|
|
|
+ g_cs = CLS_SetTLS;
|
|
|
+ }
|
|
|
+ else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_SetTLS:
|
|
|
+ if(nTlsMode == MQTTCL_TLS_MODE_CRT)
|
|
|
+ {
|
|
|
+ g_lf.Info("Using TLS with certificates.\n");
|
|
|
+
|
|
|
+ if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
|
|
|
+ {
|
|
|
+ CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
|
|
|
+
|
|
|
+ if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
|
|
|
+ {
|
|
|
+ g_lf.Info("Using TLS with PSK.\n");
|
|
|
+
|
|
|
+ if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
|
|
|
+ {
|
|
|
+ CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
|
|
|
+
|
|
|
+ if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Unconnected:
|
|
|
+ if(g_bConnected)
|
|
|
+ {
|
|
|
+ g_bConnected = false;
|
|
|
+ g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ }
|
|
|
+ if(!nNumConn)
|
|
|
+ g_cs = CLS_Connect;
|
|
|
+ else
|
|
|
+ g_cs = CLS_Reconnect;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Connect:
|
|
|
+ if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ if(bConnPending)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(bReconnect)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
|
|
|
+ }
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Reconnect:
|
|
|
+ if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ if(bConnPending)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(bReconnect)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
|
|
|
+ }
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Connecting:
|
|
|
+ if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ if(bReconnect)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
|
|
|
+ }
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else if(!bConnPending)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if(g_bConnected)
|
|
|
+ {
|
|
|
+ g_cs = CLS_Connected;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Connected:
|
|
|
+ g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ ++nNumConn;
|
|
|
+ g_nSubcribed = 0;
|
|
|
+ g_cs = CLS_Subscribe;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Subscribe:
|
|
|
+ g_lf.Info("Subscribing control-topics ...\n");
|
|
|
+ if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
|
|
|
+ g_cs = CLS_Subscribing;
|
|
|
+ else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ if(bConnPending)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(bReconnect)
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Subscribing:
|
|
|
+ if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+ if(bReconnect)
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ else if(bConnPending)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if(g_nSubcribed == _COUNTOF(subCtrlMap))
|
|
|
+ {
|
|
|
+ g_cs = CLS_Subscribed;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Subscribed:
|
|
|
+ g_lf.Info("Subscriptions acknowledged.\n");
|
|
|
+ g_lf.Info("Enter SHM processing loop ...\n");
|
|
|
+ g_cs = CLS_ProcMsg;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_ProcMsg:
|
|
|
+ if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
|
|
|
+ {
|
|
|
+#if _TRACK_TIMES
|
|
|
+ std::string s1, s2;
|
|
|
+ pc_time64_t elapsed;
|
|
|
+ g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
|
|
|
+ g_pc.ClockTrigger();
|
|
|
+#endif // _TRACK_TIMES
|
|
|
+
|
|
|
+ _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
|
|
|
+
|
|
|
+#if _TRACK_TIMES
|
|
|
+ if(g_nDbgCounter1)
|
|
|
+ {
|
|
|
+ elapsed = g_pc.ClockGetElapsed();
|
|
|
+ s1 = CProcessClock::Interval2String(elapsed);
|
|
|
+ s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
|
|
|
+ TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
|
|
|
+ }
|
|
|
+ g_pc.ClockTrigger();
|
|
|
+#endif // _TRACK_TIMES
|
|
|
+
|
|
|
+ _SIG_BLOCK(&g_set);
|
|
|
+ vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked);
|
|
|
+ _SIG_UNBLOCK(&g_set);
|
|
|
+#if _TRACK_TIMES
|
|
|
+ g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
|
|
|
+ if(g_nDbgCounter2)
|
|
|
+ {
|
|
|
+ elapsed = g_pc.ClockGetElapsed();
|
|
|
+ s1 = CProcessClock::Interval2String(elapsed);
|
|
|
+ s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
|
|
|
+ TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
|
|
|
+ g_pc.ClockTrigger();
|
|
|
+ }
|
|
|
+ g_nDbgCounter3 =
|
|
|
+#endif // _TRACK_TIMES
|
|
|
+
|
|
|
+ _ProcessOutgoing(mqttCl);
|
|
|
+
|
|
|
+#if _TRACK_TIMES
|
|
|
+ if(g_nDbgCounter3)
|
|
|
+ {
|
|
|
+ elapsed = g_pc.ClockGetElapsed();
|
|
|
+ s1 = CProcessClock::Interval2String(elapsed);
|
|
|
+ s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
|
|
|
+ TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
|
|
|
+ }
|
|
|
+#endif // _TRACK_TIMES
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if(bReconnect)
|
|
|
+ g_cs = CLS_Unconnected;
|
|
|
+ else if(bConnPending)
|
|
|
+ g_cs = CLS_Connecting;
|
|
|
+ else if(g_bIntr)
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ g_csLast = g_cs;
|
|
|
+ g_cs = CLS_Err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Err:
|
|
|
+ g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
|
|
|
+ TRACE("Error: %s\n", strErr.c_str());
|
|
|
+ g_fZombie = true;
|
|
|
+ g_cs = CLS_ShutDown;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_ShutDown:
|
|
|
+ if(g_bIntr && g_nLastSig >= 0)
|
|
|
+ {
|
|
|
+ g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
|
|
|
+ g_nLastSig = -1;
|
|
|
+ }
|
|
|
+ g_lf.Info("Process shutting down ...\n");
|
|
|
+ if(g_csLast >= CLS_Subscribed)
|
|
|
+ g_cs = CLS_Unsubscribe;
|
|
|
+ else if(g_csLast >= CLS_Connected)
|
|
|
+ g_cs = CLS_Disconnect;
|
|
|
+ else if(g_csLast > CLS_NotInit)
|
|
|
+ g_cs = CLS_Cleanup;
|
|
|
+ else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
|
|
|
+ g_cs = CLS_Paused;
|
|
|
+ else
|
|
|
+ g_cs = CLS_Exit;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Unsubscribe:
|
|
|
+ g_lf.Info("Unsubscribe control-topics.\n");
|
|
|
+ _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
|
|
|
+ g_nSubcribed = 0;
|
|
|
+ g_cs = CLS_Disconnect;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Disconnect:
|
|
|
+ g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
|
|
|
+ mqttCl.disconnect();
|
|
|
+ g_bConnected = false;
|
|
|
+ g_cs = CLS_Cleanup;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Cleanup:
|
|
|
+ g_lf.Info("Clean up.\n");
|
|
|
+ CMqttClient::Cleanup();
|
|
|
+ if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
|
|
|
+ g_cs = CLS_Paused;
|
|
|
+ else
|
|
|
+ g_cs = CLS_Exit;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Paused:
|
|
|
+ if(!g_fPauseImp && !g_fPauseCmd)
|
|
|
+ {
|
|
|
+ if(!g_bIntr)
|
|
|
+ g_cs = CLS_NotInit;
|
|
|
+ else
|
|
|
+ g_cs = CLS_Exit;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case CLS_Exit:
|
|
|
+ g_fRun = false;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ while(false);
|
|
|
+
|
|
|
+ ////////////////////////////////////////////////////////////////////////////////////////////////
|
|
|
+
|
|
|
+ if(hShm)
|
|
|
+ {
|
|
|
+ if(pShm)
|
|
|
+ {
|
|
|
+ g_lf.Info("Release SHM Pointer.\n");
|
|
|
+ ::GfaIpcReleasePointer(hShm, pShm);
|
|
|
+ }
|
|
|
+
|
|
|
+ g_lf.Info("Release SHM Handle.\n");
|
|
|
+ ::GfaIpcReleaseSHM(hShm);
|
|
|
+ }
|
|
|
+
|
|
|
+ if(g_fZombie)
|
|
|
+ {
|
|
|
+ if(hAC)
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
|
|
|
+ TRACE("Enter Zombie state ...\n");
|
|
|
+ g_lf.Warning("Enter Zombie state ...\n");
|
|
|
+ g_lf.Flush();
|
|
|
+ pause();
|
|
|
+
|
|
|
+ if(g_nLastSig >= 0)
|
|
|
+ g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
|
|
|
+ }
|
|
|
+
|
|
|
+ if(hAC)
|
|
|
+ {
|
|
|
+ g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
|
|
|
+ ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
|
|
|
+ g_lf.Info("Releasing App Control ...\n");
|
|
|
+ ::GfaIpcAppCtrlRelease(hAC);
|
|
|
+ }
|
|
|
+
|
|
|
+ g_lf.Info("Process exit.\n\n");
|
|
|
+ g_lf.Close();
|
|
|
+ CLogfile::StdErr("MqttCl exit.\n");
|
|
|
+ return nRet;
|
|
|
+}
|