Pārlūkot izejas kodu

Änderungen an Rest, Datalogger, Summarist und MqttCl.

Rind 3 gadi atpakaļ
vecāks
revīzija
c6a416eaf5

+ 1 - 1
_OEM_.pro

@@ -5,7 +5,7 @@ TEMPLATE = subdirs
 
 SUBDIRS += \
 		svc \
-		usr
+		app
 
 CONFIG += qtc_runnable
 CONFIG += ordered

+ 0 - 0
usr/usr.pro → app/usr.pro


+ 0 - 0
usr/visu/img/ata64.png → app/visu/img/ata64.png


+ 0 - 0
usr/visu/img/blank.png → app/visu/img/blank.png


+ 0 - 0
usr/visu/img/header.png → app/visu/img/header.png


+ 0 - 0
usr/visu/img/mmc64.png → app/visu/img/mmc64.png


+ 0 - 0
usr/visu/img/next.png → app/visu/img/next.png


+ 0 - 0
usr/visu/img/prev.png → app/visu/img/prev.png


+ 0 - 0
usr/visu/img/usb64.png → app/visu/img/usb64.png


+ 0 - 0
usr/visu/main.cpp → app/visu/main.cpp


+ 0 - 0
usr/visu/qml.qrc → app/visu/qml.qrc


+ 0 - 0
usr/visu/qml/appctrl.qml → app/visu/qml/appctrl.qml


+ 0 - 0
usr/visu/qml/main.qml → app/visu/qml/main.qml


+ 0 - 0
usr/visu/qml/shm.qml → app/visu/qml/shm.qml


+ 0 - 0
usr/visu/qml/sysinfo.qml → app/visu/qml/sysinfo.qml


+ 0 - 0
usr/visu/visu.pro → app/visu/visu.pro


+ 2 - 2
install.cfg

@@ -9,8 +9,8 @@ oemsubstfiles =
 	"projal.pri",
 	"_OEM_.pri",
 	"_OEM_.cfg",
-	"usr/visu/visu.pro",
-	"usr/visu/qml/shm.qml"
+	"app/visu/visu.pro",
+	"app/visu/qml/shm.qml"
 );
 
 oemrenfiles =

+ 96 - 33
svc/mqttcl/main.cpp

@@ -82,6 +82,11 @@ unsigned long g_nDbgCounter3 = 0;
 #define _TOPIC_CMD_SET				"SET"
 #define _TOPIC_CMD_STATUS			"STATUS"
 
+#define _CONNECT_MAX_RETRIES(err)	(((err) == MOSQ_ERR_EAI) ? 30 : 3)
+
+////////////////////////////////////////////////////////////////////////////////////////////////////
+//
+
 typedef enum _TOPIC_CTRL_CMD
 {
 	TCC_Control,
@@ -99,6 +104,7 @@ typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
 typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
 {
 	CLS_NotInit,
+	CLS_SetLastWill,
 	CLS_SetTLS,
 	CLS_Unconnected,
 	CLS_Connect,
@@ -108,9 +114,11 @@ typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_psz
 	CLS_Subscribe,
 	CLS_Subscribing,
 	CLS_Subscribed,
+	CLS_PublishConnect,
 	CLS_ProcMsg,
-	CLS_ShutDown,
+	CLS_PublishDisconnect,
 	CLS_Err,
+	CLS_ShutDown,
 	CLS_Unsubscribe,
 	CLS_Disconnect,
 	CLS_Cleanup,
@@ -122,6 +130,7 @@ typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
 static const char *g_pszStateNames[] =
 {
 	"Not Init",
+	"Set Last Will",
 	"Set TLS",
 	"Unconnected",
 	"Connect",
@@ -131,9 +140,11 @@ static const char *g_pszStateNames[] =
 	"Subscribe",
 	"Subscribing",
 	"Subscribed",
+	"Publish Connect",
 	"ProcMsg",
-	"ShutDown",
+	"Publish Disconnect",
 	"Error",
+	"ShutDown",
 	"Unsubscribe",
 	"Disconnect",
 	"Cleanup",
@@ -174,13 +185,14 @@ static volatile bool				g_fZombie		= false;
 static appid_t						g_nDepRunning	= 0;
 static sigset_t						g_set;
 static CLogfile						g_lf;
-static int							g_nLastSig = -1;
+static int							g_nLastSig		= -1;
 static shm_t						g_shmShadow;
-static MQTT_CLIENT_STATES			g_cs = CLS_NotInit;
-static MQTT_CLIENT_STATES			g_csLast = CLS_NotInit;
-static bool							g_bConnected = false;
-static int							g_nSubcribed = 0;
-static bool							g_bIntr = false;
+static MQTT_CLIENT_STATES			g_cs			= CLS_NotInit;
+static MQTT_CLIENT_STATES			g_csLast		= CLS_NotInit;
+static bool							g_bConnected	= false;
+static int							g_nSubcribed	= 0;
+static bool							g_bIntr			= false;
+static int							g_nErrRetries	= 0;
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 //
@@ -259,7 +271,7 @@ static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig
 						switch((nType = jtVal.Type()))
 						{
 						case JSON_TRUE:
-							pVar->RemoveRetained(cfg.GetDeviceID(), cfg.GetShmID(), rcl.GetMsgQueueSnd(), true);
+							pVar->RemoveRetained(cfg.GetPrefix(), nullptr, rcl.GetMsgQueueSnd(), true);
 							break;
 						case JSON_FALSE:
 							g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
@@ -390,7 +402,7 @@ static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig
 				MQTT_VALUE_JSON,
 				MQTT_VALUE_PBUF
 			};
-			
+
 			for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
 			{
 				if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
@@ -566,7 +578,7 @@ static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
 		GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
 		GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
 		TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
-		
+
 		if(nAppIdSrc & _DEPENDENCIES)
 		{
 			if(state == GIAS_Running)
@@ -620,7 +632,7 @@ int main(int /*argc*/, char **/*argv*/)
 	unsigned long long nUsecWorkTime = 0;
 	int nTlsMode;
 	CProcessClock pcWork;
-	
+
     ////////////////////////////////////////////////////////////////////////////////////////////////
     // check for multiple instances
 
@@ -724,12 +736,12 @@ int main(int /*argc*/, char **/*argv*/)
 
 		const SUB_CTRL_TOPIC subCtrlMap[] =
 		{
-			formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_CTRL),
-			formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
-			formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
-			formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
-			formatString("%s/%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
-//			formatString("%s/%s/%s/#", cfg.GetDeviceID(), cfg.GetShmID(), _TOPIC_CMD_STATUS)
+			formatString("%s/%s/#", cfg.GetPrefix(), _TOPIC_CMD_CTRL),
+			formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
+			formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
+			formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
+			formatString("%s/%s/%s/#", cfg.GetPrefix(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
+//			formatString("%s/%s/#", cfg.GetPrefix(), _TOPIC_CMD_STATUS)
 		};
 
 		////////////////////////////////////////////////////////////////////////////////////////////
@@ -739,7 +751,7 @@ int main(int /*argc*/, char **/*argv*/)
 			g_lf.Error("GfaIpcAcquireSHM failed!\n");
 	    	break;
 	    }
-    
+
 		g_lf.Info("Acquired SHM Handle.\n");
 
         if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
@@ -750,7 +762,7 @@ int main(int /*argc*/, char **/*argv*/)
 
 		g_lf.Info("Acquired SHM Pointer.\n");
         ::GfaIpcDumpSHMROT();
-        
+
     	memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
 
 		////////////////////////////////////////////////////////////////////////////////////////////
@@ -839,7 +851,7 @@ int main(int /*argc*/, char **/*argv*/)
 					}
 					else
 					{
-						g_cs = CLS_Unconnected;
+						g_cs = CLS_SetLastWill;
 						g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
 					}
 				}
@@ -864,17 +876,29 @@ int main(int /*argc*/, char **/*argv*/)
 					}
 					else
 					{
-						g_cs = CLS_Unconnected;
+						g_cs = CLS_SetLastWill;
 						g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
 					}
 				}
 				else
 				{
-					g_cs = CLS_Unconnected;
+					g_cs = CLS_SetLastWill;
 					g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
 				}
 				break;
 
+			case CLS_SetLastWill:
+				if(cfg.HasLastWill())
+				{
+					std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetLastWillTopic());
+					if((nErr = mqttCl.will_set(strTopic.c_str(), cfg.GetLastWillMessageLength(), cfg.GetLastWillMessage(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain())) != MOSQ_ERR_SUCCESS)
+					{
+						CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
+					}
+				}
+				g_cs = CLS_Unconnected;
+				break;
+
 			case CLS_Unconnected:
 				if(g_bConnected)
 				{
@@ -888,7 +912,7 @@ int main(int /*argc*/, char **/*argv*/)
 				break;
 
 			case CLS_Connect:
-				if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort())) == MOSQ_ERR_SUCCESS)
+				if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort(), cfg.GetKeepAliveTime())) == MOSQ_ERR_SUCCESS)
 					g_cs = CLS_Connecting;
 				else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
 				{
@@ -906,8 +930,15 @@ int main(int /*argc*/, char **/*argv*/)
 					}
 					else
 					{
-						g_csLast = g_cs;
-						g_cs = CLS_Err;
+						if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
+						{
+							g_csLast = g_cs;
+							g_cs = CLS_Err;
+						}
+						else
+						{
+							g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
+						}
 					}
 				}
 				break;
@@ -931,13 +962,21 @@ int main(int /*argc*/, char **/*argv*/)
 					}
 					else
 					{
-						g_csLast = g_cs;
-						g_cs = CLS_Err;
+						if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
+						{
+							g_csLast = g_cs;
+							g_cs = CLS_Err;
+						}
+						else
+						{
+							g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
+						}
 					}
 				}
 				break;
 
 			case CLS_Connecting:
+				g_nErrRetries = 0;
 				if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
 				{
 					if(bReconnect)
@@ -1020,6 +1059,17 @@ int main(int /*argc*/, char **/*argv*/)
 			case CLS_Subscribed:
 				g_lf.Info("Subscriptions acknowledged.\n");
 				g_lf.Info("Enter SHM processing loop ...\n");
+				g_cs = CLS_PublishConnect;
+				break;
+
+			case CLS_PublishConnect:
+				if(cfg.HasConnectMsg())
+				{
+					std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetConnectTopic());
+					CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetConnectMessage(), cfg.GetConnectMessageLength(), cfg.GetConnectQOS(), cfg.GetConnectRetain());
+					mqttCl.publish(pMsg);
+					pMsg->Release();
+				}
 				g_cs = CLS_ProcMsg;
 				break;
 
@@ -1047,7 +1097,7 @@ int main(int /*argc*/, char **/*argv*/)
 #endif	//	_TRACK_TIMES
 
 					_SIG_BLOCK(&g_set);
-					vtbl.CheckShmAndPublish(cfg.GetDeviceID(), cfg.GetShmID(), mqttCl.GetMsgQueueSnd(), nLocked);
+					vtbl.CheckShmAndPublish(cfg.GetPrefix(), nullptr, mqttCl.GetMsgQueueSnd(), nLocked);
 					_SIG_UNBLOCK(&g_set);
 #if _TRACK_TIMES
 					g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
@@ -1059,7 +1109,7 @@ int main(int /*argc*/, char **/*argv*/)
 						TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
 						g_pc.ClockTrigger();
 					}
-					g_nDbgCounter3 = 
+					g_nDbgCounter3 =
 #endif	//	_TRACK_TIMES
 
 					_ProcessOutgoing(mqttCl);
@@ -1083,7 +1133,7 @@ int main(int /*argc*/, char **/*argv*/)
 					else if(g_bIntr)
 					{
 						g_csLast = g_cs;
-						g_cs = CLS_ShutDown;
+						g_cs = CLS_PublishDisconnect;
 					}
 					else
 					{
@@ -1097,7 +1147,20 @@ int main(int /*argc*/, char **/*argv*/)
 				g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
 				TRACE("Error: %s\n", strErr.c_str());
 				g_fZombie = true;
-				g_cs = CLS_ShutDown;
+				g_cs = CLS_PublishDisconnect;
+				break;
+			
+			case CLS_PublishDisconnect:
+				if(cfg.HasLastWillOnExit())
+				{
+					std::string strTopic = formatString("%s/%s", cfg.GetPrefix(), cfg.GetLastWillTopic());
+					CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetLastWillOnExitMessage(), cfg.GetLastWillOnExitMessageLength(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain());
+					mqttCl.publish(pMsg);
+					pMsg->Release();
+					bool bDummy = false;
+					mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bDummy, bDummy, bDummy, strErr);
+				}
+				g_cs = CLS_Disconnect;
 				break;
 
 			case CLS_ShutDown:
@@ -1180,7 +1243,7 @@ int main(int /*argc*/, char **/*argv*/)
 		g_lf.Info("Release SHM Handle.\n");
 	    ::GfaIpcReleaseSHM(hShm);
     }
-	
+
 	if(g_fZombie)
 	{
 		if(hAC)

+ 28 - 8
svc/rest/main.cpp

@@ -1,6 +1,7 @@
 #include <gfa/svc/common/instance.h>
 #include <gfa/svc/rest/helpers.h>
 #include <gfa/svc/rest/callback.h>
+#include <gfa/svc/rest/plugin.h>
 #include "../../projal.h"
 
 #define _REST_USE_SSL				0
@@ -156,6 +157,9 @@ int main(int argc, char *argv[])
 	CURLcode cuGlobInit = CURL_LAST;
 	int ulfInit = U_ERROR, ulfStart = U_ERROR;
 	bool bUlfFrmwrkStarted = false;
+	char szRootDir[PATH_MAX];
+	const char *pszRootDir = nullptr;
+	json_error_t jerr;
 
     /////////////////////////////////////////////////////////////////////////
     // check for multiple instances
@@ -172,10 +176,6 @@ int main(int argc, char *argv[])
 
 	do
 	{
-		char szRootDir[PATH_MAX];
-		const char *pszRootDir;
-		json_error_t jerr;
-
 		g_fZombie = true;
 
 		/////////////////////////////////////////////////////////////
@@ -262,12 +262,17 @@ int main(int argc, char *argv[])
 			nRet = -1;
 			break;
 		}
-			
+	}
+	while(false);
+
+	CShm_t shm(pShm, hShm);
+	CRestVarTable map;
+
+	do
+	{			
 		/////////////////////////////////////////////////////////////////////
 		// SHM and SHM variables table
 
-		CRestVarTable map;
-		CShm_t shm(pShm, hShm);
 		shm.InitPath(NULL, NULL);
 		shm.CreateMembersTable(map);
 
@@ -278,10 +283,16 @@ int main(int argc, char *argv[])
 		srp.pMap	= &map;
 		srp.pszUuid	= UUID_SHM;
 
+		REST_PLUGIN_REQUEST_HANDLER_PARAMS hp;
+		hp.hShm = hShm;
+		hp.pShm = pShm;
+		hp.pParam = reinterpret_cast<void*>(&shm);
+		_uuid_parse(UUID_SHM, &hp.uuidShm);
+
 		/////////////////////////////////////////////////////////////////////
 		/////////////////////////////////////////////////////////////////////
 		// add handler functions
-		// initialize static files if any
+		// initialize static files and plugins, if any
 		if(InitializeStaticFiles(pszRootDir, &instance, jerr) < 0)
 		{
 			ETRACE("InitializeStaticFiles failed!\n");
@@ -289,6 +300,13 @@ int main(int argc, char *argv[])
 			break;
 		}
 
+		if(LoadPlugins(pszRootDir, &instance, jerr, &hp) < 0)
+		{
+			ETRACE("LoadPlugins failed!\n");
+			nRet = -1;
+			break;
+		}
+
 		/////////////////////////////////////////////////////////////////////
 		// POST
 		if(ulfius_add_endpoint_by_val(&instance, "POST", GET_SHM_PREFIX, NULL, 0, &GetShmPostResponseCallback, &srp) != U_OK)
@@ -449,6 +467,8 @@ int main(int argc, char *argv[])
 
 	if(ulfInit == U_OK)
 		ulfius_clean_instance(&instance);
+	
+	UnloadPlugins();
 
 	if(hShm)
 	{