12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202 |
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- #include <stdio.h>
- #include <signal.h>
- #include <linux/limits.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <linux/if_arp.h>
- #include <netinet/in.h>
- #include <ifaddrs.h>
- #include <sys/ioctl.h>
- #include <fcntl.h>
- #include <unistd.h>
- #include <getopt.h>
- #include "strutil.h"
- #include "fileutil.h"
- #include "instance.h"
- #include "processclock.h"
- #include "debug.h"
- #include "projal.h"
- #include "mqttclient.h"
- #include "mqttcfg.h"
- #include "mqttdbg.h"
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- #if _ENABLE_MEM_TRACE
- #include <mcheck.h>
- class CMtrace
- {
- public:
- CMtrace(void) {
- putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
- mtrace();
- }
- ~CMtrace(void){
- // muntrace();
- }
- };
- #endif // _ENABLE_MEM_TRACE
- #if _TRACK_TIMES
- static CProcessClock g_pc;
- unsigned long g_nDbgCounter1 = 0;
- unsigned long g_nDbgCounter2 = 0;
- unsigned long g_nDbgCounter3 = 0;
- #endif // _TRACK_TIMES
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- // app control
- #define _APPID GFA_APPCTRL_APPID_MQTTCL
- #define _APPNAME "MqttCl"
- #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- #define _UPDATE_INTERVAL_MS 100
- #define _RECONN_INTERVAL_MS 1000
- #define _LOGFILE_NAME "mqttcl.log"
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
- #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
- #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
- #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
- #define _TOPIC_CTRL_KEY_BINLE "binLe"
- #define _TOPIC_CTRL_KEY_BINBE "binBe"
- #define _TOPIC_CTRL_KEY_JSON "json"
- #define _TOPIC_CTRL_KEY_PBUF "pBuf"
- #define _TOPIC_CTRL_KEY_QOS "qos"
- #define _TOPIC_CTRL_KEY_RETAIN "retained"
- #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
- #define _TOPIC_CMD_CTRL "CONTROL"
- #define _TOPIC_CMD_SET "SET"
- #define _TOPIC_CMD_STATUS "STATUS"
- typedef enum _TOPIC_CTRL_CMD
- {
- TCC_Control,
- TCC_SetBinLe,
- TCC_SetBinBe,
- TCC_SetJson,
- TCC_SetPBuf,
- TCC_Status
- }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
- typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
- {
- CLS_NotInit,
- CLS_SetTLS,
- CLS_Unconnected,
- CLS_Connect,
- CLS_Reconnect,
- CLS_Connecting,
- CLS_Connected,
- CLS_Subscribe,
- CLS_Subscribing,
- CLS_Subscribed,
- CLS_ProcMsg,
- CLS_ShutDown,
- CLS_Err,
- CLS_Unsubscribe,
- CLS_Disconnect,
- CLS_Cleanup,
- CLS_Paused,
- CLS_Exit
- }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
- typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
- static const char *g_pszStateNames[] =
- {
- "Not Init",
- "Set TLS",
- "Unconnected",
- "Connect",
- "Reconnect",
- "Connecting",
- "Connected",
- "Subscribe",
- "Subscribing",
- "Subscribed",
- "ProcMsg",
- "ShutDown",
- "Error",
- "Unsubscribe",
- "Disconnect",
- "Cleanup",
- "Paused",
- "Exit"
- };
- static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
- {
- if(cs >= CLS_NotInit && cs <= CLS_Exit)
- return g_pszStateNames[cs];
- return "";
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- typedef struct _SUB_CTRL_TOPIC
- {
- _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
- {
- this->nVarOffset = s.length() - 2;
- }
- std::string sTopic;
- size_t nVarOffset;
- }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
- typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- //static std::string _CreateDeviceID(void);
- static volatile bool g_fRun = false;
- static volatile bool g_fPauseImp = false;
- static volatile bool g_fPauseCmd = false;
- static volatile bool g_fZombie = false;
- static appid_t g_nDepRunning = 0;
- static sigset_t g_set;
- static CLogfile g_lf;
- static int g_nLastSig = -1;
- static shm_t g_shmShadow;
- static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
- static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
- static bool g_bConnected = false;
- static int g_nSubcribed = 0;
- static bool g_bIntr = false;
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static const char* _GetBaseDir(std::string &rstrBaseDir)
- {
- char szBaseDir[PATH_MAX];
- rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
- rtrim(rstrBaseDir, "/");
- return rstrBaseDir.c_str();
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _SigHandler(int sig)
- {
- g_nLastSig = sig;
- g_bIntr = true;
- g_fPauseImp = g_fPauseCmd = g_fZombie = false;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
- {
- if(usleep(t) < 0 && errno == EINTR)
- return CLS_ShutDown;
- return csNext;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
- {
- CMqttVar *pVar;
- CMqttMessage *pMsg;
- std::string sVarPath;
- while((pMsg = rcl.PopRcvMsg()))
- {
- if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
- {
- sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
- if((pVar = vt.Find(sVarPath.c_str())))
- {
- bool bChanged = false;
- int nType, nQos;
- CJson_t jtRoot, jtVal;
- std::string err;
- uint32_t nMaskOn = 0, nMaskOff = 0;
- if(pMsg->GetPayloadAsJSON(jtRoot, err))
- {
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- bChanged = pVar->SetRetained(true) || bChanged;
- break;
- case JSON_FALSE:
- bChanged = pVar->SetRetained(false) || bChanged;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true);
- break;
- case JSON_FALSE:
- g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_INTEGER:
- nQos = (int)json_integer_value(jtVal);
- if(nQos < MQTTCL_MIN_QOS)
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
- else if(nQos > MQTTCL_MAX_QOS)
- g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
- bChanged = pVar->SetQoS(nQos) || bChanged;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_BINLE;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_BINLE;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_BINBE;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_BINBE;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_JSON;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_JSON;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
- break;
- }
- }
- if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
- {
- switch((nType = jtVal.Type()))
- {
- case JSON_TRUE:
- nMaskOn |= MQTT_VALUE_PBUF;
- break;
- case JSON_FALSE:
- nMaskOff |= MQTT_VALUE_PBUF;
- break;
- default:
- g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
- break;
- }
- }
- if(nMaskOff)
- {
- if(pVar->DisablePublish(nMaskOff, &vt))
- {
- bChanged = true;
- }
- }
- if(nMaskOn)
- {
- if(pVar->EnablePublish(nMaskOn, &vt))
- {
- bChanged = true;
- }
- }
- #if _DUMP_ENABLED_VARS
- if(bChanged)
- vt.DumpPubEnabled();
- #endif // _DUMP_ENABLED_VARS
- }
- else
- {
- g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
- }
- }
- }
- else
- {
- int nLocks = 0;
- static const uint32_t nFormats[] =
- {
- 0,
- MQTT_VALUE_BINLE,
- MQTT_VALUE_BINBE,
- MQTT_VALUE_JSON,
- MQTT_VALUE_PBUF
- };
-
- for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
- {
- if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
- {
- sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
- if((pVar = vt.Find(sVarPath.c_str())))
- {
- if(pVar->PublishEnabled())
- {
- if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
- {
- // !!!
- }
- }
- else
- {
- // !!!
- }
- break;
- }
- else
- {
- // !!!
- break;
- }
- }
- }
- }
- pMsg->Release();
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static int _ProcessOutgoing(CMqttClient &rcl)
- {
- int nRet = 0;
- CMqttMessage *pMsg;
- while((pMsg = rcl.PopSndMsg()))
- {
- rcl.publish(pMsg);
- pMsg->Release();
- ++nRet;
- }
- return nRet;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
- {
- int nRet;
- for(size_t i = 0; i < nLenMap; i++)
- {
- if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
- break;
- TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
- }
- return nRet;
- }
- static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
- {
- for(size_t i = 0; i < nLenMap; i++)
- {
- rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
- {
- switch(pntf->evt)
- {
- case NEVT_Log:
- if(pntf->log.level == MOSQ_LOG_ERR)
- {
- TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
- g_lf.Error("%s\n", pntf->log.str);
- }
- break;
- case NEVT_Connect:
- if(pntf->con.rc == MOSQ_ERR_SUCCESS)
- g_bConnected = true;
- break;
- case NEVT_Disconnect:
- break;
- case NEVT_Subscribe:
- ++g_nSubcribed;
- break;
- case NEVT_Unsubscribe:
- break;
- default:
- break;
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
- {
- ctrlmsg_t nCtrlMsg;
- while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
- {
- switch(nCtrlMsg)
- {
- case GFA_APPCTRL_CTRLMSG_STOP:
- g_bIntr = true;
- g_fPauseImp = false;
- g_fPauseCmd = false;
- g_fZombie = false;
- g_lf.Info("Received Control Message 'Stop'\n");
- break;
- case GFA_APPCTRL_CTRLMSG_PAUSE:
- if(!g_fPauseCmd)
- {
- g_fPauseCmd = true;
- if(!g_fPauseImp)
- {
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
- g_lf.Info("Received Control Message 'Pause'\n");
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
- }
- }
- break;
- case GFA_APPCTRL_CTRLMSG_RESUME:
- if(g_fPauseCmd)
- {
- g_fPauseCmd = false;
- if(!g_fPauseImp)
- {
- g_lf.Info("Received Control Message 'Resume'\n");
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
- TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- }
- }
- break;
- default:
- break;
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
- {
- appid_t nAppIdSrc;
- bool fOldPaused = g_fPauseImp;
- char szDispName[128];
- while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
- {
- GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
- GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
- TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
-
- if(nAppIdSrc & _DEPENDENCIES)
- {
- if(state == GIAS_Running)
- {
- g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
- g_nDepRunning |= nAppIdSrc;
- }
- else
- {
- g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
- g_nDepRunning &= ~nAppIdSrc;
- }
- }
- }
- if(!g_bIntr)
- {
- g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
- if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
- {
- fOldPaused = g_fPauseImp;
- GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
- ::GfaIpcAppCtrlSetState(hAC, newState);
- if(g_fPauseImp)
- g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
- else
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////
- //
- int main(int /*argc*/, char **/*argv*/)
- {
- int nRet = 0;
- CProcessInstance pi;
- std::string sDevID;
- char szLogFile[PATH_MAX];
- std::string strBaseDir;
- const char *pszBaseDir = NULL;
- HAPPCTRL hAC = NULL;
- HAPPINFO hAI;
- HSHM hShm = NULL;
- void *pShm = NULL;
- unsigned long long nUsecWorkTime = 0;
- int nTlsMode;
- CProcessClock pcWork;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // check for multiple instances
- if(!pi.LockInstance(UUID_SHM))
- {
- CLogfile::StdErr("Failed to start instance!\n");
- return -1;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // configure signal handling
- struct sigaction sa;
- ::sigfillset(&g_set);
- sigaddset(&g_set, SIGUSR1);
- memset(&sa, 0, sizeof(sa));
- sa.sa_handler = _SigHandler;
- sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
- sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
- sigaction(SIGTERM, &sa, NULL); // handles normal termination
- sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
- sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
- sa.sa_handler = SIG_IGN;
- sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
- sigaction(SIGSTOP, &sa, NULL); // ignores Stop
- sigaction(SIGCONT, &sa, NULL); // ignores Continue
- sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
- sigaction(0, &sa, NULL); // ignores shell termination
- do
- {
- g_fZombie = true;
- ////////////////////////////////////////////////////////////////////////////////////////////
- // get the base directory for output files
- if(!pszBaseDir)
- pszBaseDir = _GetBaseDir(strBaseDir);
- CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
- ////////////////////////////////////////////////////////////////////////////////////////////
- // initialize log file
- sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
- if(!g_lf.Open(szLogFile))
- {
- CLogfile::StdErr("Failed to create/open log file!\n");
- nRet = -1;
- break;
- }
- g_lf.Info("Process started.\n");
- ////////////////////////////////////////////////////////////////////////////////////////////
- // initialize app control
- g_lf.Info("Acquire AppCtrl-Handle.\n");
- if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
- {
- g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
- break;
- }
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
- if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
- {
- g_lf.Error("Failed to subscribe state event notifications!\n");
- break;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////
- // parse the config file
- CMqttClConfig cfg(UUID_SHM);
- if(!cfg.LoadCfg(MQTTCL_CONFIG_FILE_PATH, g_lf))
- {
- nRet = -1;
- break;
- }
- nTlsMode = cfg.GetTLSMode();
- // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
- ////////////////////////////////////////////////////////////////////////////////////////////
- // client control topic map
- const SUB_CTRL_TOPIC subCtrlMap[] =
- {
- formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL),
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
- formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
- // formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS)
- };
- ////////////////////////////////////////////////////////////////////////////////////////////
- if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
- {
- g_lf.Error("GfaIpcAcquireSHM failed!\n");
- break;
- }
-
- g_lf.Info("Acquired SHM Handle.\n");
- if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
- {
- g_lf.Error("GfaIpcAcquirePointer failed!\n");
- break;
- }
- g_lf.Info("Acquired SHM Pointer.\n");
- ::GfaIpcDumpSHMROT();
-
- memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
- ////////////////////////////////////////////////////////////////////////////////////////////
- int nErr, nLocked = 0, nNumConn = 0;
- bool bReconnect, bConnPending;
- std::string strErr;
- CMqttVarTable vtbl;
- CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
- shm.InitPath(NULL, NULL);
- shm.CreateMembersTable(vtbl);
- #if _DUMP_ENABLED_VARS
- vtbl.DumpPubEnabled();
- #endif // _DUMP_ENABLED_VARS
- ////////////////////////////////////////////////////////////////////////////////////////////
- CMqttClient mqttCl(cfg.GetDeviceID());
- mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
- g_fZombie = false;
- g_fRun = true;
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
- while(g_fRun)
- {
- ////////////////////////////////////////////////////////////////////////////////////////
- // update app control info
- if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
- {
- _ProcessCtrlMessages(hAC, hAI);
- _ProcessStateEvents(hAC, hAI);
- }
- if(g_fPauseImp || g_fPauseCmd)
- {
- if(g_cs < CLS_ShutDown)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- nUsecWorkTime = 0;
- }
- }
- pcWork.ClockTrigger();
- switch(g_cs)
- {
- case CLS_NotInit:
- if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
- {
- g_cs = CLS_SetTLS;
- }
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- break;
- case CLS_SetTLS:
- if(nTlsMode == MQTTCL_TLS_MODE_CRT)
- {
- g_lf.Info("Using TLS with certificates.\n");
- if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
- {
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
- if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else
- {
- g_cs = CLS_Unconnected;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- }
- else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
- {
- g_lf.Info("Using TLS with PSK.\n");
- if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetDeviceID(), NULL)) != MOSQ_ERR_SUCCESS)
- {
- CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
- if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else
- {
- g_cs = CLS_Unconnected;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- }
- else
- {
- g_cs = CLS_Unconnected;
- g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- break;
- case CLS_Unconnected:
- if(g_bConnected)
- {
- g_bConnected = false;
- g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- }
- if(!nNumConn)
- g_cs = CLS_Connect;
- else
- g_cs = CLS_Reconnect;
- break;
- case CLS_Connect:
- if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Connecting;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Reconnect:
- if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Connecting;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Connecting:
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bReconnect)
- {
- g_csLast = g_cs;
- g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
- }
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else if(!bConnPending)
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else if(g_bConnected)
- {
- g_cs = CLS_Connected;
- }
- break;
- case CLS_Connected:
- g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- ++nNumConn;
- g_nSubcribed = 0;
- g_cs = CLS_Subscribe;
- break;
- case CLS_Subscribe:
- g_lf.Info("Subscribing control-topics ...\n");
- if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
- g_cs = CLS_Subscribing;
- else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bConnPending)
- g_cs = CLS_Connecting;
- else if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Subscribing:
- if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(bConnPending)
- g_cs = CLS_Connecting;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- else if(g_nSubcribed == _COUNTOF(subCtrlMap))
- {
- g_cs = CLS_Subscribed;
- }
- break;
- case CLS_Subscribed:
- g_lf.Info("Subscriptions acknowledged.\n");
- g_lf.Info("Enter SHM processing loop ...\n");
- g_cs = CLS_ProcMsg;
- break;
- case CLS_ProcMsg:
- if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
- {
- #if _TRACK_TIMES
- std::string s1, s2;
- pc_time64_t elapsed;
- g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
- g_pc.ClockTrigger();
- #endif // _TRACK_TIMES
- _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
- #if _TRACK_TIMES
- if(g_nDbgCounter1)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
- TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
- }
- g_pc.ClockTrigger();
- #endif // _TRACK_TIMES
- _SIG_BLOCK(&g_set);
- vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked);
- _SIG_UNBLOCK(&g_set);
- #if _TRACK_TIMES
- g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
- if(g_nDbgCounter2)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
- TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
- g_pc.ClockTrigger();
- }
- g_nDbgCounter3 =
- #endif // _TRACK_TIMES
- _ProcessOutgoing(mqttCl);
- #if _TRACK_TIMES
- if(g_nDbgCounter3)
- {
- elapsed = g_pc.ClockGetElapsed();
- s1 = CProcessClock::Interval2String(elapsed);
- s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
- TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
- }
- #endif // _TRACK_TIMES
- }
- else
- {
- if(bReconnect)
- g_cs = CLS_Unconnected;
- else if(bConnPending)
- g_cs = CLS_Connecting;
- else if(g_bIntr)
- {
- g_csLast = g_cs;
- g_cs = CLS_ShutDown;
- }
- else
- {
- g_csLast = g_cs;
- g_cs = CLS_Err;
- }
- }
- break;
- case CLS_Err:
- g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
- TRACE("Error: %s\n", strErr.c_str());
- g_fZombie = true;
- g_cs = CLS_ShutDown;
- break;
- case CLS_ShutDown:
- if(g_bIntr && g_nLastSig >= 0)
- {
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
- g_nLastSig = -1;
- }
- g_lf.Info("Process shutting down ...\n");
- if(g_csLast >= CLS_Subscribed)
- g_cs = CLS_Unsubscribe;
- else if(g_csLast >= CLS_Connected)
- g_cs = CLS_Disconnect;
- else if(g_csLast > CLS_NotInit)
- g_cs = CLS_Cleanup;
- else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
- g_cs = CLS_Paused;
- else
- g_cs = CLS_Exit;
- break;
- case CLS_Unsubscribe:
- g_lf.Info("Unsubscribe control-topics.\n");
- _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
- g_nSubcribed = 0;
- g_cs = CLS_Disconnect;
- break;
- case CLS_Disconnect:
- g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
- mqttCl.disconnect();
- g_bConnected = false;
- g_cs = CLS_Cleanup;
- break;
- case CLS_Cleanup:
- g_lf.Info("Clean up.\n");
- CMqttClient::Cleanup();
- if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
- g_cs = CLS_Paused;
- else
- g_cs = CLS_Exit;
- break;
- case CLS_Paused:
- if(!g_fPauseImp && !g_fPauseCmd)
- {
- if(!g_bIntr)
- g_cs = CLS_NotInit;
- else
- g_cs = CLS_Exit;
- }
- else
- {
- usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
- continue;
- }
- break;
- case CLS_Exit:
- g_fRun = false;
- break;
- }
- nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
- }
- }
- while(false);
- ////////////////////////////////////////////////////////////////////////////////////////////////
- if(hShm)
- {
- if(pShm)
- {
- g_lf.Info("Release SHM Pointer.\n");
- ::GfaIpcReleasePointer(hShm, pShm);
- }
- g_lf.Info("Release SHM Handle.\n");
- ::GfaIpcReleaseSHM(hShm);
- }
-
- if(g_fZombie)
- {
- if(hAC)
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
- TRACE("Enter Zombie state ...\n");
- g_lf.Warning("Enter Zombie state ...\n");
- g_lf.Flush();
- pause();
- if(g_nLastSig >= 0)
- g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
- }
- if(hAC)
- {
- g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
- ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
- g_lf.Info("Releasing App Control ...\n");
- ::GfaIpcAppCtrlRelease(hAC);
- }
- g_lf.Info("Process exit.\n\n");
- g_lf.Close();
- CLogfile::StdErr("MqttCl exit.\n");
- return nRet;
- }
|