//////////////////////////////////////////////////////////////////////////////////////////////////// // #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "projal.h" //////////////////////////////////////////////////////////////////////////////////////////////////// #if _ENABLE_MEM_TRACE #include class CMtrace { public: CMtrace(void) { putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log"); mtrace(); } ~CMtrace(void){ // muntrace(); } }; #endif // _ENABLE_MEM_TRACE #if _TRACK_TIMES static CProcessClock g_pc; unsigned long g_nDbgCounter1 = 0; unsigned long g_nDbgCounter2 = 0; unsigned long g_nDbgCounter3 = 0; #endif // _TRACK_TIMES //////////////////////////////////////////////////////////////////////////////////////////////////// // app control #define _APPID GFA_APPCTRL_APPID_MQTTCL #define _APPNAME "MqttCl" #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT)) //////////////////////////////////////////////////////////////////////////////////////////////////// // #define _UPDATE_INTERVAL_MS 100 #define _RECONN_INTERVAL_MS 1000 #define _LOGFILE_NAME "mqttcl.log" //////////////////////////////////////////////////////////////////////////////////////////////////// #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL) #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL) #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS) #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US) #define _TOPIC_CTRL_KEY_BINLE "binLe" #define _TOPIC_CTRL_KEY_BINBE "binBe" #define _TOPIC_CTRL_KEY_JSON "json" #define _TOPIC_CTRL_KEY_PBUF "pBuf" #define _TOPIC_CTRL_KEY_QOS "qos" #define _TOPIC_CTRL_KEY_RETAIN "retained" #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained" #define _TOPIC_CMD_CTRL "CONTROL" #define _TOPIC_CMD_SET "SET" #define _TOPIC_CMD_STATUS "STATUS" typedef enum _TOPIC_CTRL_CMD { TCC_Control, TCC_SetBinLe, TCC_SetBinBe, TCC_SetJson, TCC_SetPBuf, TCC_Status }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD; typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD; //////////////////////////////////////////////////////////////////////////////////////////////////// // typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!! { CLS_NotInit, CLS_SetTLS, CLS_Unconnected, CLS_Connect, CLS_Reconnect, CLS_Connecting, CLS_Connected, CLS_Subscribe, CLS_Subscribing, CLS_Subscribed, CLS_ProcMsg, CLS_ShutDown, CLS_Err, CLS_Unsubscribe, CLS_Disconnect, CLS_Cleanup, CLS_Paused, CLS_Exit }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES; typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES; static const char *g_pszStateNames[] = { "Not Init", "Set TLS", "Unconnected", "Connect", "Reconnect", "Connecting", "Connected", "Subscribe", "Subscribing", "Subscribed", "ProcMsg", "ShutDown", "Error", "Unsubscribe", "Disconnect", "Cleanup", "Paused", "Exit" }; static const char * _GetClientStateString(MQTT_CLIENT_STATES cs) { if(cs >= CLS_NotInit && cs <= CLS_Exit) return g_pszStateNames[cs]; return ""; } //////////////////////////////////////////////////////////////////////////////////////////////////// // typedef struct _SUB_CTRL_TOPIC { _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s) { this->nVarOffset = s.length() - 2; } std::string sTopic; size_t nVarOffset; }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC; typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC; //////////////////////////////////////////////////////////////////////////////////////////////////// // //static std::string _CreateDeviceID(void); static volatile bool g_fRun = false; static volatile bool g_fPauseImp = false; static volatile bool g_fPauseCmd = false; static volatile bool g_fZombie = false; static appid_t g_nDepRunning = 0; static sigset_t g_set; static CLogfile g_lf; static int g_nLastSig = -1; static shm_t g_shmShadow; static MQTT_CLIENT_STATES g_cs = CLS_NotInit; static MQTT_CLIENT_STATES g_csLast = CLS_NotInit; static bool g_bConnected = false; static int g_nSubcribed = 0; static bool g_bIntr = false; //////////////////////////////////////////////////////////////////////////////////////////////////// // static const char* _GetBaseDir(std::string &rstrBaseDir) { char szBaseDir[PATH_MAX]; rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir)); rtrim(rstrBaseDir, "/"); return rstrBaseDir.c_str(); } //////////////////////////////////////////////////////////////////////////////////////////////////// // static void _SigHandler(int sig) { g_nLastSig = sig; g_bIntr = true; g_fPauseImp = g_fPauseCmd = g_fZombie = false; } //////////////////////////////////////////////////////////////////////////////////////////////////// // static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext) { if(usleep(t) < 0 && errno == EINTR) return CLS_ShutDown; return csNext; } //////////////////////////////////////////////////////////////////////////////////////////////////// // static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap) { CMqttVar *pVar; CMqttMessage *pMsg; std::string sVarPath; while((pMsg = rcl.PopRcvMsg())) { if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str())) { sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset); if((pVar = vt.Find(sVarPath.c_str()))) { bool bChanged = false; int nType, nQos; CJson_t jtRoot, jtVal; std::string err; uint32_t nMaskOn = 0, nMaskOff = 0; if(pMsg->GetPayloadAsJSON(jtRoot, err)) { if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: bChanged = pVar->SetRetained(true) || bChanged; break; case JSON_FALSE: bChanged = pVar->SetRetained(false) || bChanged; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true); break; case JSON_FALSE: g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED); break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal)) { switch((nType = jtVal.Type())) { case JSON_INTEGER: nQos = (int)json_integer_value(jtVal); if(nQos < MQTTCL_MIN_QOS) g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS); else if(nQos > MQTTCL_MAX_QOS) g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS); bChanged = pVar->SetQoS(nQos) || bChanged; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: nMaskOn |= MQTT_VALUE_BINLE; break; case JSON_FALSE: nMaskOff |= MQTT_VALUE_BINLE; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: nMaskOn |= MQTT_VALUE_BINBE; break; case JSON_FALSE: nMaskOff |= MQTT_VALUE_BINBE; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: nMaskOn |= MQTT_VALUE_JSON; break; case JSON_FALSE: nMaskOff |= MQTT_VALUE_JSON; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType); break; } } if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal)) { switch((nType = jtVal.Type())) { case JSON_TRUE: nMaskOn |= MQTT_VALUE_PBUF; break; case JSON_FALSE: nMaskOff |= MQTT_VALUE_PBUF; break; default: g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType); break; } } if(nMaskOff) { if(pVar->DisablePublish(nMaskOff, &vt)) { bChanged = true; } } if(nMaskOn) { if(pVar->EnablePublish(nMaskOn, &vt)) { bChanged = true; } } #if _DUMP_ENABLED_VARS if(bChanged) vt.DumpPubEnabled(); #endif // _DUMP_ENABLED_VARS } else { g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str()); } } } else { int nLocks = 0; static const uint32_t nFormats[] = { 0, MQTT_VALUE_BINLE, MQTT_VALUE_BINBE, MQTT_VALUE_JSON, MQTT_VALUE_PBUF }; for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++) { if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str())) { sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset); if((pVar = vt.Find(sVarPath.c_str()))) { if(pVar->PublishEnabled()) { if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks)) { // !!! } } else { // !!! } break; } else { // !!! break; } } } } pMsg->Release(); } } //////////////////////////////////////////////////////////////////////////////////////////////////// // static int _ProcessOutgoing(CMqttClient &rcl) { int nRet = 0; CMqttMessage *pMsg; while((pMsg = rcl.PopSndMsg())) { rcl.publish(pMsg); pMsg->Release(); ++nRet; } return nRet; } //////////////////////////////////////////////////////////////////////////////////////////////////// // static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap) { int nRet; for(size_t i = 0; i < nLenMap; i++) { if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS) break; TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS); } return nRet; } static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap) { for(size_t i = 0; i < nLenMap; i++) { rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str()); } } //////////////////////////////////////////////////////////////////////////////////////////////////// // static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx) { switch(pntf->evt) { case NEVT_Log: if(pntf->log.level == MOSQ_LOG_ERR) { TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str); g_lf.Error("%s\n", pntf->log.str); } break; case NEVT_Connect: if(pntf->con.rc == MOSQ_ERR_SUCCESS) g_bConnected = true; break; case NEVT_Disconnect: break; case NEVT_Subscribe: ++g_nSubcribed; break; case NEVT_Unsubscribe: break; default: break; } } //////////////////////////////////////////////////////////////////////////////////////////////////// static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI) { ctrlmsg_t nCtrlMsg; while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI))) { switch(nCtrlMsg) { case GFA_APPCTRL_CTRLMSG_STOP: g_bIntr = true; g_fPauseImp = false; g_fPauseCmd = false; g_fZombie = false; g_lf.Info("Received Control Message 'Stop'\n"); break; case GFA_APPCTRL_CTRLMSG_PAUSE: if(!g_fPauseCmd) { g_fPauseCmd = true; if(!g_fPauseImp) { ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused); g_lf.Info("Received Control Message 'Pause'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); } } break; case GFA_APPCTRL_CTRLMSG_RESUME: if(g_fPauseCmd) { g_fPauseCmd = false; if(!g_fPauseImp) { g_lf.Info("Received Control Message 'Resume'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); } } break; default: break; } } } //////////////////////////////////////////////////////////////////////////////////////////////////// static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI) { appid_t nAppIdSrc; bool fOldPaused = g_fPauseImp; char szDispName[128]; while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI))) { GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc); GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName)); TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); if(nAppIdSrc & _DEPENDENCIES) { if(state == GIAS_Running) { g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); g_nDepRunning |= nAppIdSrc; } else { g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); g_nDepRunning &= ~nAppIdSrc; } } } if(!g_bIntr) { g_fPauseImp = (g_nDepRunning != _DEPENDENCIES); if(!g_fPauseCmd && (fOldPaused != g_fPauseImp)) { fOldPaused = g_fPauseImp; GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running; ::GfaIpcAppCtrlSetState(hAC, newState); if(g_fPauseImp) g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState)); else g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState)); } } } //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////// // int main(int /*argc*/, char **/*argv*/) { int nRet = 0; CProcessInstance pi; std::string sDevID; char szLogFile[PATH_MAX]; std::string strBaseDir; const char *pszBaseDir = NULL; HAPPCTRL hAC = NULL; HAPPINFO hAI; HSHM hShm = NULL; void *pShm = NULL; unsigned long long nUsecWorkTime = 0; int nTlsMode; CProcessClock pcWork; //////////////////////////////////////////////////////////////////////////////////////////////// // check for multiple instances if(!pi.LockInstance(UUID_SHM)) { CLogfile::StdErr("Failed to start instance!\n"); return -1; } //////////////////////////////////////////////////////////////////////////////////////////////// // configure signal handling struct sigaction sa; ::sigfillset(&g_set); sigaddset(&g_set, SIGUSR1); memset(&sa, 0, sizeof(sa)); sa.sa_handler = _SigHandler; sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\' sigaction(SIGTERM, &sa, NULL); // handles normal termination sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort()) sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C' sa.sa_handler = SIG_IGN; sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z' sigaction(SIGSTOP, &sa, NULL); // ignores Stop sigaction(SIGCONT, &sa, NULL); // ignores Continue sigaction(SIGCHLD, &sa, NULL); // ignores child process termination sigaction(0, &sa, NULL); // ignores shell termination do { g_fZombie = true; //////////////////////////////////////////////////////////////////////////////////////////// // get the base directory for output files if(!pszBaseDir) pszBaseDir = _GetBaseDir(strBaseDir); CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir); //////////////////////////////////////////////////////////////////////////////////////////// // initialize log file sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME); if(!g_lf.Open(szLogFile)) { CLogfile::StdErr("Failed to create/open log file!\n"); nRet = -1; break; } g_lf.Info("Process started.\n"); //////////////////////////////////////////////////////////////////////////////////////////// // initialize app control g_lf.Info("Acquire AppCtrl-Handle.\n"); if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3))) { g_lf.Error("Failed to acquire AppCtrl-Handle!\n"); break; } ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing)); if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES)) { g_lf.Error("Failed to subscribe state event notifications!\n"); break; } //////////////////////////////////////////////////////////////////////////////////////////// // parse the config file CMqttClConfig cfg(UUID_SHM); if(!cfg.LoadCfg(MQTTCL_CONFIG_FILE_PATH, g_lf)) { nRet = -1; break; } nTlsMode = cfg.GetTLSMode(); // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID()); //////////////////////////////////////////////////////////////////////////////////////////// // client control topic map const SUB_CTRL_TOPIC subCtrlMap[] = { formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL), formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET), formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET), formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET), formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET), // formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS) }; //////////////////////////////////////////////////////////////////////////////////////////// if(!(hShm = ::acquire_shm(sizeof(shm_t), 1))) { g_lf.Error("GfaIpcAcquireSHM failed!\n"); break; } g_lf.Info("Acquired SHM Handle.\n"); if(!(pShm = ::GfaIpcAcquirePointer(hShm))) { g_lf.Error("GfaIpcAcquirePointer failed!\n"); break; } g_lf.Info("Acquired SHM Pointer.\n"); ::GfaIpcDumpSHMROT(); memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t)); //////////////////////////////////////////////////////////////////////////////////////////// int nErr, nLocked = 0, nNumConn = 0; bool bReconnect, bConnPending; std::string strErr; CMqttVarTable vtbl; CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain()); shm.InitPath(NULL, NULL); shm.CreateMembersTable(vtbl); #if _DUMP_ENABLED_VARS vtbl.DumpPubEnabled(); #endif // _DUMP_ENABLED_VARS //////////////////////////////////////////////////////////////////////////////////////////// CMqttClient mqttCl(cfg.GetDeviceID()); mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL); g_fZombie = false; g_fRun = true; ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); while(g_fRun) { //////////////////////////////////////////////////////////////////////////////////////// // update app control info if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime))) { _ProcessCtrlMessages(hAC, hAI); _ProcessStateEvents(hAC, hAI); } if(g_fPauseImp || g_fPauseCmd) { if(g_cs < CLS_ShutDown) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { nUsecWorkTime = 0; } } pcWork.ClockTrigger(); switch(g_cs) { case CLS_NotInit: if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS) { g_cs = CLS_SetTLS; } else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr)) { g_csLast = g_cs; g_cs = CLS_Err; } break; case CLS_SetTLS: if(nTlsMode == MQTTCL_TLS_MODE_CRT) { g_lf.Info("Using TLS with certificates.\n"); if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS) { CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr); if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } else { g_cs = CLS_Unconnected; g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); } } else if(nTlsMode == MQTTCL_TLS_MODE_PSK) { g_lf.Info("Using TLS with PSK.\n"); if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS) { CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr); if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } else { g_cs = CLS_Unconnected; g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); } } else { g_cs = CLS_Unconnected; g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); } break; case CLS_Unconnected: if(g_bConnected) { g_bConnected = false; g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); } if(!nNumConn) g_cs = CLS_Connect; else g_cs = CLS_Reconnect; break; case CLS_Connect: if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS) g_cs = CLS_Connecting; else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr)) { if(bConnPending) g_cs = CLS_Connecting; else if(bReconnect) { g_csLast = g_cs; g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs); } else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } break; case CLS_Reconnect: if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS) g_cs = CLS_Connecting; else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr)) { if(bConnPending) g_cs = CLS_Connecting; else if(bReconnect) { g_csLast = g_cs; g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs); } else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } break; case CLS_Connecting: if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr)) { if(bReconnect) { g_csLast = g_cs; g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected); } else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else if(!bConnPending) { g_csLast = g_cs; g_cs = CLS_Err; } } else if(g_bConnected) { g_cs = CLS_Connected; } break; case CLS_Connected: g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); ++nNumConn; g_nSubcribed = 0; g_cs = CLS_Subscribe; break; case CLS_Subscribe: g_lf.Info("Subscribing control-topics ...\n"); if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS) g_cs = CLS_Subscribing; else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr)) { if(bConnPending) g_cs = CLS_Connecting; else if(bReconnect) g_cs = CLS_Unconnected; else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } break; case CLS_Subscribing: if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr)) { if(bReconnect) g_cs = CLS_Unconnected; else if(bConnPending) g_cs = CLS_Connecting; else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } else if(g_nSubcribed == _COUNTOF(subCtrlMap)) { g_cs = CLS_Subscribed; } break; case CLS_Subscribed: g_lf.Info("Subscriptions acknowledged.\n"); g_lf.Info("Enter SHM processing loop ...\n"); g_cs = CLS_ProcMsg; break; case CLS_ProcMsg: if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr)) { #if _TRACK_TIMES std::string s1, s2; pc_time64_t elapsed; g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0; g_pc.ClockTrigger(); #endif // _TRACK_TIMES _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap); #if _TRACK_TIMES if(g_nDbgCounter1) { elapsed = g_pc.ClockGetElapsed(); s1 = CProcessClock::Interval2String(elapsed); s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1); TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str()); } g_pc.ClockTrigger(); #endif // _TRACK_TIMES _SIG_BLOCK(&g_set); vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked); _SIG_UNBLOCK(&g_set); #if _TRACK_TIMES g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size(); if(g_nDbgCounter2) { elapsed = g_pc.ClockGetElapsed(); s1 = CProcessClock::Interval2String(elapsed); s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2); TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str()); g_pc.ClockTrigger(); } g_nDbgCounter3 = #endif // _TRACK_TIMES _ProcessOutgoing(mqttCl); #if _TRACK_TIMES if(g_nDbgCounter3) { elapsed = g_pc.ClockGetElapsed(); s1 = CProcessClock::Interval2String(elapsed); s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3); TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str()); } #endif // _TRACK_TIMES } else { if(bReconnect) g_cs = CLS_Unconnected; else if(bConnPending) g_cs = CLS_Connecting; else if(g_bIntr) { g_csLast = g_cs; g_cs = CLS_ShutDown; } else { g_csLast = g_cs; g_cs = CLS_Err; } } break; case CLS_Err: g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str()); TRACE("Error: %s\n", strErr.c_str()); g_fZombie = true; g_cs = CLS_ShutDown; break; case CLS_ShutDown: if(g_bIntr && g_nLastSig >= 0) { g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); g_nLastSig = -1; } g_lf.Info("Process shutting down ...\n"); if(g_csLast >= CLS_Subscribed) g_cs = CLS_Unsubscribe; else if(g_csLast >= CLS_Connected) g_cs = CLS_Disconnect; else if(g_csLast > CLS_NotInit) g_cs = CLS_Cleanup; else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr) g_cs = CLS_Paused; else g_cs = CLS_Exit; break; case CLS_Unsubscribe: g_lf.Info("Unsubscribe control-topics.\n"); _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap)); g_nSubcribed = 0; g_cs = CLS_Disconnect; break; case CLS_Disconnect: g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort()); mqttCl.disconnect(); g_bConnected = false; g_cs = CLS_Cleanup; break; case CLS_Cleanup: g_lf.Info("Clean up.\n"); CMqttClient::Cleanup(); if((g_fPauseImp || g_fPauseCmd) && !g_bIntr) g_cs = CLS_Paused; else g_cs = CLS_Exit; break; case CLS_Paused: if(!g_fPauseImp && !g_fPauseCmd) { if(!g_bIntr) g_cs = CLS_NotInit; else g_cs = CLS_Exit; } else { usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS)); continue; } break; case CLS_Exit: g_fRun = false; break; } nUsecWorkTime = pcWork.ClockGetElapsed() / 1000; } } while(false); //////////////////////////////////////////////////////////////////////////////////////////////// if(hShm) { if(pShm) { g_lf.Info("Release SHM Pointer.\n"); ::GfaIpcReleasePointer(hShm, pShm); } g_lf.Info("Release SHM Handle.\n"); ::GfaIpcReleaseSHM(hShm); } if(g_fZombie) { if(hAC) ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie); TRACE("Enter Zombie state ...\n"); g_lf.Warning("Enter Zombie state ...\n"); g_lf.Flush(); pause(); if(g_nLastSig >= 0) g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); } if(hAC) { g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating); g_lf.Info("Releasing App Control ...\n"); ::GfaIpcAppCtrlRelease(hAC); } g_lf.Info("Process exit.\n\n"); g_lf.Close(); CLogfile::StdErr("MqttCl exit.\n"); return nRet; }