|
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- #include <stdio.h>
- #include <signal.h>
- #include <linux/limits.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <linux/if_arp.h>
- #include <netinet/in.h>
- #include <ifaddrs.h>
- #include <sys/ioctl.h>
- #include <fcntl.h>
- #include <unistd.h>
- #include <getopt.h>
- #include <gfa/svc/common/strutil.h>
- #include <gfa/svc/common/fileutil.h>
- #include <gfa/svc/common/instance.h>
- #include <gfa/svc/common/processclock.h>
- #include <gfa/svc/common/debug.h>
- #include <gfa/svc/mqttcl/mqttclient.h>
- #include <gfa/svc/mqttcl/mqttcfg.h>
- #include <gfa/svc/mqttcl/mqttdbg.h>
- #include "projal.h"
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- #if _ENABLE_MEM_TRACE
- #include <mcheck.h>
- class CMtrace
- {
- public:
- CMtrace(void) {
- putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
- mtrace();
- }
- ~CMtrace(void){
- // muntrace();
- }
- };
- #endif // _ENABLE_MEM_TRACE
- #if _TRACK_TIMES
- static CProcessClock g_pc;
- unsigned long g_nDbgCounter1 = 0;
- unsigned long g_nDbgCounter2 = 0;
- unsigned long g_nDbgCounter3 = 0;
- #endif // _TRACK_TIMES
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- // app control
- #define _APPID GFA_APPCTRL_APPID_MQTTCL
- #define _APPNAME "MqttCl"
- #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- #define _UPDATE_INTERVAL_MS 100
- #define _RECONN_INTERVAL_MS 1000
- #define _LOGFILE_NAME "mqttcl.log"
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
- #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
- #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
- #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
- #define _TOPIC_CTRL_KEY_BINLE "binLe"
- #define _TOPIC_CTRL_KEY_BINBE "binBe"
- #define _TOPIC_CTRL_KEY_JSON "json"
- #define _TOPIC_CTRL_KEY_PBUF "pBuf"
- #define _TOPIC_CTRL_KEY_QOS "qos"
- #define _TOPIC_CTRL_KEY_RETAIN "retained"
- #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
- #define _TOPIC_CMD_CTRL "CONTROL"
- #define _TOPIC_CMD_SET "SET"
- #define _TOPIC_CMD_STATUS "STATUS"
- #define _CONNECT_MAX_RETRIES(err) (((err) == MOSQ_ERR_EAI) ? 30 : 3)
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- typedef enum _TOPIC_CTRL_CMD
- {
- TCC_Control,
- TCC_SetBinLe,
- TCC_SetBinBe,
- TCC_SetJson,
- TCC_SetPBuf,
- TCC_Status
- }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
- typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
- {
- CLS_NotInit,
- CLS_SetLastWill,
- CLS_SetTLS,
- CLS_Unconnected,
- CLS_Connect,
- CLS_Reconnect,
- CLS_Connecting,
- CLS_Connected,
- CLS_Subscribe,
- CLS_Subscribing,
- CLS_Subscribed,
- CLS_PublishConnect,
- CLS_ProcMsg,
- CLS_PublishDisconnect,
- CLS_Err,
- CLS_ShutDown,
- CLS_Unsubscribe,
- CLS_Disconnect,
- CLS_Cleanup,
- CLS_Paused,
- CLS_Exit
- }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
- typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
- static const char *g_pszStateNames[] =
- {
- "Not Init",
- "Set Last Will",
- "Set TLS",
- "Unconnected",
- "Connect",
- "Reconnect",
- "Connecting",
- "Connected",
- "Subscribe",
- "Subscribing",
- "Subscribed",
- "Publish Connect",
- "ProcMsg",
- "Publish Disconnect",
- "Error",
- "ShutDown",
- "Unsubscribe",
- "Disconnect",
- "Cleanup",
- "Paused",
- "Exit"
- };
- static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
- {
- if(cs >= CLS_NotInit && cs <= CLS_Exit)
- return g_pszStateNames[cs];
- return "";
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- typedef struct _SUB_CTRL_TOPIC
- {
- _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
- {
- this->nVarOffset = s.length() - 2;
- }
- std::string sTopic;
- size_t nVarOffset;
- }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
- typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- //static std::string _CreateDeviceID(void);
- static volatile bool g_fRun = false;
- static volatile bool g_fPauseImp = false;
- static volatile bool g_fPauseCmd = false;
- static volatile bool g_fZombie = false;
- static appid_t g_nDepRunning = 0;
- static sigset_t g_set;
- static CLogfile g_lf;
- static int g_nLastSig = -1;
- static shm_t g_shmShadow;
- static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
- static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
- static bool g_bConnected = false;
- static int g_nSubcribed = 0;
- static bool g_bIntr = false;
- static int g_nErrRetries = 0;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static const char* _GetBaseDir(std::string &rstrBaseDir)
- {
- char szBaseDir[PATH_MAX];
- rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
- rtrim(rstrBaseDir, "/");
- return rstrBaseDir.c_str();
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _SigHandler(int sig)
- {
- g_nLastSig = sig;
- g_bIntr = true;
- g_fPauseImp = g_fPauseCmd = g_fZombie = false;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
- {
- if(usleep(t) < 0 && errno == EINTR)
- return CLS_ShutDown;
- return csNext;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
- {
- CMqttVar *pVar;
- CMqttMessage *pMsg;
- std::string sVarPath;
- while((pMsg = rcl.PopRcvMsg()))
- {
- if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
- {
- sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
- if((pVar = vt.Find(sVarPath.c_str())))
- {
- bool bChanged = false;
- int nType, nQos;
- CJson_t jtRoot, jtVal;
- std::string err;
- uint32_t nMaskOn = 0, nMaskOff = 0;
- if(pMsg->GetPayloadAsJSON(jtRoot, err))
- {
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- bChanged = pVar->SetRetained(true) || bChanged;
- break;
- case JSON_FALSE:
- bChanged = pVar->SetRetained(false) || bChanged;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- pVar->RemoveRetained(cfg.GetTopicPrefix(), nullptr, rcl.GetMsgQueueSnd(), true);
- break;
- case JSON_FALSE:
- g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_INTEGER:
- nQos = (int)json_integer_value(jtVal);
- if(nQos < MQTTCL_MIN_QOS)
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
- else if(nQos > MQTTCL_MAX_QOS)
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
- bChanged = pVar->SetQoS(nQos) || bChanged;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_BINLE;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_BINLE;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_BINBE;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_BINBE;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_JSON;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_JSON;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_PBUF;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_PBUF;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
- break;
- }
- }
- if(nMaskOff)
- {
- if(pVar->DisablePublish(nMaskOff, &vt))
- {
- bChanged = true;
- }
- }
- if(nMaskOn)
- {
- if(pVar->EnablePublish(nMaskOn, &vt))
- {
- bChanged = true;
- }
- }
- #if _DUMP_ENABLED_VARS
- if(bChanged)
- vt.DumpPubEnabled();
- #endif // _DUMP_ENABLED_VARS
- }
- else
- {
- g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
- }
- }
- }
- else
- {
- int nLocks = 0;
- static const uint32_t nFormats[] =
- {
- 0,
- MQTT_VALUE_BINLE,
- MQTT_VALUE_BINBE,
- MQTT_VALUE_JSON,
- MQTT_VALUE_PBUF
- };
- for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
- {
- if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
- {
- sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
- if((pVar = vt.Find(sVarPath.c_str())))
- {
- if(pVar->PublishEnabled())
- {
- if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
- {
- // !!!
- }
- }
- else
- {
- // !!!
- }
- break;
- }
- else
- {
- // !!!
- break;
- }
- }
- }
- }
- pMsg->Release();
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static int _ProcessOutgoing(CMqttClient &rcl)
- {
- int nRet = 0;
- CMqttMessage *pMsg;
- while((pMsg = rcl.PopSndMsg()))
- {
- rcl.publish(pMsg);
- pMsg->Release();
- ++nRet;
- }
- return nRet;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
- {
- int nRet;
- for(size_t i = 0; i < nLenMap; i++)
- {
- if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
- break;
- TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
- }
- return nRet;
- }
- static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
- {
- for(size_t i = 0; i < nLenMap; i++)
- {
- rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
- {
- switch(pntf->evt)
- {
- case NEVT_Log:
- if(pntf->log.level == MOSQ_LOG_ERR)
- {
- TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
- g_lf.Error("%s\n", pntf->log.str);
- }
- break;
- case NEVT_Connect:
- if(pntf->con.rc == MOSQ_ERR_SUCCESS)
- g_bConnected = true;
- break;
- case NEVT_Disconnect:
- break;
- case NEVT_Subscribe:
- ++g_nSubcribed;
- break;
- case NEVT_Unsubscribe:
- break;
- default:
- break;
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
- {
- ctrlmsg_t nCtrlMsg;
- while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
- {
- switch(nCtrlMsg)
- {
- case GFA_APPCTRL_CTRLMSG_STOP:
- g_bIntr = true;
- g_fPauseImp = false;
- g_fPauseCmd = false;
- g_fZombie = false;
- g_lf.Info("Received Control Message 'Stop'\n");
- break;
- case GFA_APPCTRL_CTRLMSG_PAUSE:
- if(!g_fPauseCmd)
- {
- g_fPauseCmd = true;
- if(!g_fPauseImp)
- {
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
- g_lf.Info("Received Control Message 'Pause'\n");
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
- }
- }
- break;
- case GFA_APPCTRL_CTRLMSG_RESUME:
- if(g_fPauseCmd)
- {
- g_fPauseCmd = false;
- if(!g_fPauseImp)
- {
- g_lf.Info("Received Control Message 'Resume'\n");
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- }
- }
- break;
- default:
- break;
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
- {
- appid_t nAppIdSrc;
- bool fOldPaused = g_fPauseImp;
- char szDispName[128];
- while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
- {
- GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
- GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
- TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
- if(nAppIdSrc & _DEPENDENCIES)
- {
- if(state == GIAS_Running)
- {
- g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
- g_nDepRunning |= nAppIdSrc;
- }
- else
- {
- g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
- g_nDepRunning &= ~nAppIdSrc;
- }
- }
- }
- if(!g_bIntr)
- {
- g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
- if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
- {
- fOldPaused = g_fPauseImp;
- GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
- ::GfaIpcAppCtrlSetState(hAC, newState);
- if(g_fPauseImp)
- g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
- else
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
- }
- }
- }
- static std::string _GetTopicPrefixString(const CMqttClConfig &cfg)
- {
- if(cfg.TopicPrefixDisabled())
- return "";
- return formatString("%s/", cfg.GetTopicPrefix());
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- int main(int /*argc*/, char **/*argv*/)
- {
- int nRet = 0;
- CProcessInstance pi;
- std::string sDevID;
- char szLogFile[PATH_MAX];
- std::string strBaseDir;
- const char *pszBaseDir = NULL;
- HAPPCTRL hAC = NULL;
- HAPPINFO hAI;
- HSHM hShm = NULL;
- void *pShm = NULL;
- unsigned long long nUsecWorkTime = 0;
- int nTlsMode;
- CProcessClock pcWork;
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // check for multiple instances
- if(!pi.LockInstance(UUID_SHM))
- {
- CLogfile::StdErr("Failed to start instance!\n");
- return -1;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // configure signal handling
- struct sigaction sa;
- ::sigfillset(&g_set);
- sigaddset(&g_set, SIGUSR1);
- memset(&sa, 0, sizeof(sa));
- sa.sa_handler = _SigHandler;
- sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
- sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
- sigaction(SIGTERM, &sa, NULL); // handles normal termination
- sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
- sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
- sa.sa_handler = SIG_IGN;
- sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
- sigaction(SIGSTOP, &sa, NULL); // ignores Stop
- sigaction(SIGCONT, &sa, NULL); // ignores Continue
- sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
- sigaction(0, &sa, NULL); // ignores shell termination
- do
- {
- g_fZombie = true;
- ////////////////////////////////////////////////////////////////////////////////////////////
- // get the base directory for output files
- if(!pszBaseDir)
- pszBaseDir = _GetBaseDir(strBaseDir);
- CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
- ////////////////////////////////////////////////////////////////////////////////////////////
- // initialize log file
- sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
- if(!g_lf.Open(szLogFile))
- {
- CLogfile::StdErr("Failed to create/open log file!\n");
- nRet = -1;
- break;
- }
- g_lf.Info("Process started.\n");
- ////////////////////////////////////////////////////////////////////////////////////////////
- // initialize app control
- g_lf.Info("Acquire AppCtrl-Handle.\n");
- if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
- {
- g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
- break;
- }
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
- if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
- {
- g_lf.Error("Failed to subscribe state event notifications!\n");
- break;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////
- // parse the config file
- CMqttClConfig cfg(UUID_SHM);
- #ifdef MQTTCL_CONFIG_FILE_PATH
- std::string strMqttCfg = MQTTCL_CONFIG_FILE_PATH;
- #else // MQTTCL_CONFIG_FILE_PATH
- char szBaseDir[PATH_MAX];
- ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
- std::string strMqttCfg = formatString("%s/cfg/mqttcl.cfg.json", szBaseDir);
- #endif // MQTTCL_CONFIG_FILE_PATH
- if(!cfg.LoadCfg(strMqttCfg.c_str(), g_lf))
- {
- nRet = -1;
- break;
- }
- nTlsMode = cfg.GetTLSMode();
- // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
- std::string strTopicPrefix = _GetTopicPrefixString(cfg);
- ////////////////////////////////////////////////////////////////////////////////////////////
- // client control topic map
- const SUB_CTRL_TOPIC subCtrlMap[] =
- {
- formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_CTRL),
- formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
- formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
- formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
- formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
- // formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_STATUS)
- };
- ////////////////////////////////////////////////////////////////////////////////////////////
- if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
- {
- g_lf.Error("GfaIpcAcquireSHM failed!\n");
- break;
- }
- g_lf.Info("Acquired SHM Handle.\n");
- if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
- {
- g_lf.Error("GfaIpcAcquirePointer failed!\n");
- break;
- }
- g_lf.Info("Acquired SHM Pointer.\n");
- ::GfaIpcDumpSHMROT();
- memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
- ////////////////////////////////////////////////////////////////////////////////////////////
- int nErr, nLocked = 0, nNumConn = 0;
- bool bReconnect, bConnPending;
- std::string strErr;
- CMqttVarTable vtbl;
- CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
- shm.InitPath(NULL, NULL);
- shm.CreateMembersTable(vtbl);
- #if _DUMP_ENABLED_VARS
- vtbl.DumpPubEnabled();
- #endif // _DUMP_ENABLED_VARS
- ////////////////////////////////////////////////////////////////////////////////////////////
- CMqttClient mqttCl(cfg.GetDeviceID());
- mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
- g_fZombie = false;
- g_fRun = true;
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- while(g_fRun)
- {
- ////////////////////////////////////////////////////////////////////////////////////////
- // update app control info
- if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
- {
- _ProcessCtrlMessages(hAC, hAI);
- _ProcessStateEvents(hAC, hAI);
- }
- if(g_fPauseImp || g_fPauseCmd)
- {
- if(g_cs < CLS_ShutDown)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- nUsecWorkTime = 0;
- }
- }
- pcWork.ClockTrigger();
- switch(g_cs)
- {
- case CLS_NotInit:
- if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
- {
- g_cs = CLS_SetTLS;
- }
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- break;
- case CLS_SetTLS:
- if(nTlsMode == MQTTCL_TLS_MODE_CRT)
- {
- g_lf.Info("Using TLS with certificates.\n");
- if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
- {
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
- if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else
- {
- g_cs = CLS_SetLastWill;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- }
- else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
- {
- g_lf.Info("Using TLS with PSK.\n");
- if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
- {
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
- if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else
- {
- g_cs = CLS_SetLastWill;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- }
- else
- {
- g_cs = CLS_SetLastWill;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- break;
- case CLS_SetLastWill:
- if(cfg.HasLastWill())
- {
- std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
- if((nErr = mqttCl.will_set(strTopic.c_str(), cfg.GetLastWillMessageLength(), cfg.GetLastWillMessage(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain())) != MOSQ_ERR_SUCCESS)
- {
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
- }
- }
- g_cs = CLS_Unconnected;
- break;
- case CLS_Unconnected:
- if(g_bConnected)
- {
- g_bConnected = false;
- g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- if(!nNumConn)
- g_cs = CLS_Connect;
- else
- g_cs = CLS_Reconnect;
- break;
- case CLS_Connect:
- if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort(), cfg.GetKeepAliveTime())) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Connecting;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- TRACE("CLS_Connect: %s\n", strErr.c_str());
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- else
- {
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- }
- }
- break;
- case CLS_Reconnect:
- if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Connecting;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- TRACE("CLS_Reconnect: %s\n", strErr.c_str());
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- else
- {
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- }
- }
- break;
- case CLS_Connecting:
- g_nErrRetries = 0;
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- TRACE("CLS_Connecting: %s\n", strErr.c_str());
- if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else if(!bConnPending)
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else if(g_bConnected)
- {
- g_cs = CLS_Connected;
- }
- break;
- case CLS_Connected:
- TRACE("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- ++nNumConn;
- g_nSubcribed = 0;
- g_cs = CLS_Subscribe;
- break;
- case CLS_Subscribe:
- g_lf.Info("Subscribing control-topics ...\n");
- if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Subscribing;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Subscribing:
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(bConnPending)
- g_cs = CLS_Connecting;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else if(g_nSubcribed == _COUNTOF(subCtrlMap))
- {
- g_cs = CLS_Subscribed;
- }
- break;
- case CLS_Subscribed:
- g_lf.Info("Subscriptions acknowledged.\n");
- g_lf.Info("Enter SHM processing loop ...\n");
- g_cs = CLS_PublishConnect;
- break;
- case CLS_PublishConnect:
- if(cfg.HasConnectMsg())
- {
- std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetConnectTopic());
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetConnectMessage(), cfg.GetConnectMessageLength(), cfg.GetConnectQOS(), cfg.GetConnectRetain());
- mqttCl.publish(pMsg);
- pMsg->Release();
- }
- g_cs = CLS_ProcMsg;
- break;
- case CLS_ProcMsg:
- if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- #if _TRACK_TIMES
- std::string s1, s2;
- pc_time64_t elapsed;
- g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
- g_pc.ClockTrigger();
- #endif // _TRACK_TIMES
- _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
- #if _TRACK_TIMES
- if(g_nDbgCounter1)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
- TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
- }
- g_pc.ClockTrigger();
- #endif // _TRACK_TIMES
- _SIG_BLOCK(&g_set);
- vtbl.CheckShmAndPublish(cfg.GetTopicPrefix(), nullptr, mqttCl.GetMsgQueueSnd(), nLocked);
- _SIG_UNBLOCK(&g_set);
- #if _TRACK_TIMES
- g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
- if(g_nDbgCounter2)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
- TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
- g_pc.ClockTrigger();
- }
- g_nDbgCounter3 =
- #endif // _TRACK_TIMES
- _ProcessOutgoing(mqttCl);
- #if _TRACK_TIMES
- if(g_nDbgCounter3)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
- TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
- }
- #endif // _TRACK_TIMES
- }
- else
- {
- if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(bConnPending)
- g_cs = CLS_Connecting;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_PublishDisconnect;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Err:
- g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
- TRACE("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
- g_fZombie = true;
- g_cs = CLS_PublishDisconnect;
- break;
-
- case CLS_PublishDisconnect:
- if(cfg.HasLastWillOnExit())
- {
- std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
- CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetLastWillOnExitMessage(), cfg.GetLastWillOnExitMessageLength(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain());
- mqttCl.publish(pMsg);
- pMsg->Release();
- bool bDummy = false;
- mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bDummy, bDummy, bDummy, strErr);
- }
- g_cs = CLS_Disconnect;
- break;
- case CLS_ShutDown:
- if(g_bIntr && g_nLastSig >= 0)
- {
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
- g_nLastSig = -1;
- }
- g_lf.Info("Process shutting down ...\n");
- if(g_csLast >= CLS_Subscribed)
- g_cs = CLS_Unsubscribe;
- else if(g_csLast >= CLS_Connected)
- g_cs = CLS_Disconnect;
- else if(g_csLast > CLS_NotInit)
- g_cs = CLS_Cleanup;
- else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
- g_cs = CLS_Paused;
- else
- g_cs = CLS_Exit;
- break;
- case CLS_Unsubscribe:
- g_lf.Info("Unsubscribe control-topics.\n");
- _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
- g_nSubcribed = 0;
- g_cs = CLS_Disconnect;
- break;
- case CLS_Disconnect:
- g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- mqttCl.disconnect();
- g_bConnected = false;
- g_cs = CLS_Cleanup;
- break;
- case CLS_Cleanup:
- g_lf.Info("Mqtt clean up.\n");
- CMqttClient::Cleanup();
- if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
- g_cs = CLS_Paused;
- else
- g_cs = CLS_Exit;
- break;
- case CLS_Paused:
- if(!g_fPauseImp && !g_fPauseCmd)
- {
- if(!g_bIntr)
- g_cs = CLS_NotInit;
- else
- g_cs = CLS_Exit;
- }
- else
- {
- usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
- continue;
- }
- break;
- case CLS_Exit:
- g_fRun = false;
- break;
- }
- nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
- }
- }
- while(false);
- ////////////////////////////////////////////////////////////////////////////////////////////////
- if(hShm)
- {
- if(pShm)
- {
- g_lf.Info("Release SHM Pointer.\n");
- ::GfaIpcReleasePointer(hShm, pShm);
- }
- g_lf.Info("Release SHM Handle.\n");
- ::GfaIpcReleaseSHM(hShm);
- }
- if(g_fZombie)
- {
- if(hAC)
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
- TRACE("Enter Zombie state ...\n");
- g_lf.Warning("Enter Zombie state ...\n");
- g_lf.Flush();
- pause();
- if(g_nLastSig >= 0)
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
- }
- if(hAC)
- {
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
- g_lf.Info("Releasing App Control ...\n");
- ::GfaIpcAppCtrlRelease(hAC);
- }
- g_lf.Info("Process exit.\n\n");
- g_lf.Close();
- CLogfile::StdErr("MqttCl exit.\n");
- return nRet;
- }
|