//////////////////////////////////////////////////////////////////////////////////////////////////// // #include "projal.h" #include "logfile.h" #include "fileutil.h" #include "strutil.h" #include #include #include #include #include #include "processclock.h" #include "instance.h" #include "debug.h" //////////////////////////////////////////////////////////////////////////////////////////////////// #define _USE_SIG_INFO 0 //////////////////////////////////////////////////////////////////////////////////////////////////// // app control #define _APPID GFA_APPCTRL_APPID_REMANENT #define _APPNAME "Remanent" //////////////////////////////////////////////////////////////////////////////////////////////////// #define _DEFAULT_SCAN_INTERVAL_MS 1000 // 1 sec #define _MIN_SCAN_INTERVAL_MS 100 // 100 ms //////////////////////////////////////////////////////////////////////////////////////////////////// #define _NANOSECS_PER_SEC 1000000000 #define _MAX_WRITE_CREDITS 60 #define _MAX_WRITES_PER_MINUTE 6 #define _SECS_PER_WRITE_INTERVAL (60 / _MAX_WRITES_PER_MINUTE) #define _LOGFILE_NAME "remanent.log" //////////////////////////////////////////////////////////////////////////////////////////////////// #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL) #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL) //////////////////////////////////////////////////////////////////////////////////////////////////// static volatile bool g_fRun = false; static volatile bool g_fPauseImp = false; static volatile bool g_fPauseCmd = false; static volatile bool g_fZombie = false; static int g_nLastSig = -1; static sigset_t g_set; static CLogfile g_lf; //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////// // #if _USE_SIG_INFO static void _OnSigAction(int sig, siginfo_t *psi, void *p) { TRACE("Received signal '%s', PID: %d, Code: %d.\n", strsignal(sig), psi->si_pid, psi->si_code); g_lf.Info("Received signal '%s', PID: %d, Code: %d.\n", strsignal(sig), psi->si_pid, psi->si_code); if((sig != SIGINT) || !psi->si_pid) { g_nLastSig = sig; g_fRun = g_fPauseImp = g_fPauseCmd = g_fZombie = false; } } #else static void _SigHandler(int sig) { g_nLastSig = sig; g_fRun = g_fPauseImp = g_fPauseCmd = g_fZombie = false; // g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); } #endif //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////// // static long long _TimeSpecDiffNs(const struct timespec *pts1, const struct timespec *pts2) { long long ns1 = (long long)pts1->tv_sec * _NANOSECS_PER_SEC + pts1->tv_nsec; long long ns2 = (long long)pts2->tv_sec * _NANOSECS_PER_SEC + pts2->tv_nsec; return (ns2 - ns1); } static bool _IntervalElapsed(struct timespec *pts, long nIntvSeconds) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); long long diff = _TimeSpecDiffNs(pts, &ts) / _NANOSECS_PER_SEC; if(diff >= (long long)nIntvSeconds) { pts->tv_sec += nIntvSeconds; return true; } return false; } static const char* _GetBaseDir(std::string &rstrBaseDir) { char szBaseDir[PATH_MAX]; const char *pszBaseDir = NULL; #ifdef _LOG_BASE_DIR pszBaseDir = _LOG_BASE_DIR; if(!pszBaseDir || !*pszBaseDir || !::DirectoryExist(pszBaseDir)) { CLogfile::StdErr("Invalid base directory config! Using app directory!\n"); pszBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir)); } rstrBaseDir = pszBaseDir; #else // _LOG_BASE_DIR UNUSED(pszBaseDir); rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir)); #endif // _LOG_BASE_DIR rtrim(rstrBaseDir, "/"); return rstrBaseDir.c_str(); } //////////////////////////////////////////////////////////////////////////////////////////////////// static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI) { ctrlmsg_t nCtrlMsg; while(g_fRun && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI))) { switch(nCtrlMsg) { case GFA_APPCTRL_CTRLMSG_STOP: g_fRun = false; g_fPauseImp = false; g_fPauseCmd = false; g_fZombie = false; g_lf.Info("Received Control Message 'Stop'\n"); break; case GFA_APPCTRL_CTRLMSG_PAUSE: if(!g_fPauseCmd) { g_fPauseCmd = true; if(!g_fPauseImp) { ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused); g_lf.Info("Received Control Message 'Pause'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); } } break; case GFA_APPCTRL_CTRLMSG_RESUME: if(g_fPauseCmd) { g_fPauseCmd = false; if(!g_fPauseImp) { g_lf.Info("Received Control Message 'Resume'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); } } break; default: break; } } } //////////////////////////////////////////////////////////////////////////////////////////////////// // int main(int argc, char **argv) { CProcessInstance pi; int nRet = 0, c; HSHM hShm = NULL; void *pShm = NULL; HAPPCTRL hAC = NULL; HAPPINFO hAI; Json::Value root; bool bDbgInvolved = false; bool bHasJSON = false; char szWriteFile[PATH_MAX]; char szBackupFile[PATH_MAX]; char szLogFile[PATH_MAX]; CHECK_UPDATE_SHM_RETVAL rv; struct timespec tsWriteCredits, tsScanIntv; unsigned long cntVals = 0; char *pszEndPtr = NULL; clock64_t nCycleIntervalMS = _DEFAULT_SCAN_INTERVAL_MS; int nWriteCredits = _MAX_WRITE_CREDITS; unsigned int nPendingChanges = 0; struct sigaction sa; std::string strBaseDir; const char *pszBaseDir = NULL; unsigned long long nUsecWorkTime = 0; CProcessClock pcWork, pcPerf; std::string sPerf; //////////////////////////////////////////////////////////////////////////////////////////////// // check for multiple instances if(!pi.LockInstance(UUID_SHM)) { CLogfile::StdErr("Failed to start instance!\n"); return -1; } //////////////////////////////////////////////////////////////////////////////////////////////// // parse command line options while((c = getopt(argc, argv, "b:c:d")) != -1) { switch(c) { case 'b': if(DirectoryExist(optarg)) { strBaseDir = optarg; rtrim(strBaseDir, "/"); pszBaseDir = strBaseDir.c_str(); } else { CLogfile::StdErr("Invalid base directory option!\n"); } break; case 'c': nCycleIntervalMS = strtoll(optarg, &pszEndPtr, 10); if((((nCycleIntervalMS == LLONG_MIN) || (nCycleIntervalMS == LLONG_MAX)) && (errno == ERANGE)) || (pszEndPtr && *pszEndPtr)) { nCycleIntervalMS = _DEFAULT_SCAN_INTERVAL_MS; CLogfile::StdErr("Invalid cycle interval: \"%s\"! Setting default value (%u)\n", optarg, _DEFAULT_SCAN_INTERVAL_MS); } else if(nCycleIntervalMS < _MIN_SCAN_INTERVAL_MS) { nCycleIntervalMS = _MIN_SCAN_INTERVAL_MS; CLogfile::StdErr("Invalid cycle interval: \"%s\"! Limiting to minimal value (%u)\n", optarg, _MIN_SCAN_INTERVAL_MS); } break; case 'd': bDbgInvolved = true; break; case '?': break; } } do { g_fZombie = true; //////////////////////////////////////////////////////////////////////////////////////////// // get the base directory for output files if not provided by cmd line if(!pszBaseDir) pszBaseDir = _GetBaseDir(strBaseDir); CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir); //////////////////////////////////////////////////////////////////////////////////////////// // initialize log file sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME); if(!g_lf.Open(szLogFile, true, CLogfile::VB_Inf)) { CLogfile::StdErr("Failed to create/open log file!\n"); nRet = -1; break; } g_lf.Info("Process started.\n"); //////////////////////////////////////////////////////////////////////////////////////////// // initialize app control g_lf.Info("Acquire AppCtrl-Handle.\n"); if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, nCycleIntervalMS * 1000, nCycleIntervalMS * 3000))) { g_lf.Error("Failed to acquire AppCtrl-Handle!\n"); nRet = -1; break; } ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing)); //////////////////////////////////////////////////////////////////////////////////////////// // validate config parameters #ifdef _LOG_DB_NAME time_t ts; bool bLogDbActive = false; if(strlen(_LOG_DB_NAME) >= _RL_MAX_DB_NAME_LENGTH) { g_lf.Error("Log database name too long! %s\n", _LOG_DB_NAME); nRet = -1; break; } if(strlen(_LOG_DB_USER) >= _RL_MAX_DB_USER_LENGTH) { g_lf.Error("Log database user name too long! %s\n", _LOG_DB_USER); nRet = -1; break; } if(strlen(_LOG_DB_PASS) >= _RL_MAX_DB_PASS_LENGTH) { g_lf.Error("Log database password too long!\n"); nRet = -1; break; } if(strlen(_LOG_LOGS_TABLE) >= _RL_MAX_TABLE_NAME_LENGTH) { g_lf.Error("Log table name too long! %s\n", _LOG_LOGS_TABLE); nRet = -1; break; } //////////////////////////////////////////////////////////////////////////////////////////// // configure remanent logger RLPARAMS rlp; memset(&rlp, 0, sizeof(rlp)); strncpy(rlp.szDBName, _LOG_DB_NAME, _RL_MAX_DB_NAME_LENGTH - 1); strncpy(rlp.szDBUser, _LOG_DB_USER, _RL_MAX_DB_USER_LENGTH - 1); strncpy(rlp.szDBPass, _LOG_DB_PASS, _RL_MAX_DB_PASS_LENGTH - 1); strncpy(rlp.szLogsTable, _LOG_LOGS_TABLE, _RL_MAX_TABLE_NAME_LENGTH - 1); #endif // _LOG_DB_NAME //////////////////////////////////////////////////////////////////////////////////////////// _CreateJSONWriteFileName(pszBaseDir, szWriteFile, _COUNTOF(szWriteFile)); _CreateJSONBackupFileName(pszBaseDir, szBackupFile, _COUNTOF(szBackupFile)); //////////////////////////////////////////////////////////////////////////////////////////// // configure signal handling ::sigfillset(&g_set); memset(&sa, 0, sizeof(sa)); #if _USE_SIG_INFO sa.sa_flags = SA_SIGINFO; sa.sa_sigaction = _OnSigAction; #else sa.sa_handler = _SigHandler; #endif sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\' sigaction(SIGTERM, &sa, NULL); // handles normal termination sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort()) sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C' #if _USE_SIG_INFO sa.sa_flags = 0; #endif sa.sa_handler = SIG_IGN; sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z' sigaction(SIGSTOP, &sa, NULL); // ignores Stop sigaction(SIGCONT, &sa, NULL); // ignores Continue sigaction(SIGCHLD, &sa, NULL); // ignores child process termination sigaction(0, &sa, NULL); // ignores shell termination //////////////////////////////////////////////////////////////////////////////////////////// // configure timers clock_gettime(CLOCK_MONOTONIC, &tsWriteCredits); tsScanIntv.tv_sec = nCycleIntervalMS / 1000; tsScanIntv.tv_nsec = (nCycleIntervalMS % 1000) * 1000000; //////////////////////////////////////////////////////////////////////////////////////////// // read input SHM file, if any ... pcPerf.ClockTrigger(); bHasJSON = ::ParseJSONFile(pszBaseDir, root, g_lf); sPerf = CProcessClock::Interval2String(pcPerf.ClockGetElapsed()); if(bHasJSON) { g_lf.Info("Parsing time: %s\n", sPerf.c_str()); TRACE("JSON parsing time: %s\n", sPerf.c_str()); } //////////////////////////////////////////////////////////////////////////////////////////// if(!(hShm = ::acquire_shm(sizeof(shm_t), 1))) { g_lf.Error("Unable to acquire SHM Handle!\n"); nRet = -1; break; } g_lf.Info("Acquired SHM Handle.\n"); if(!(pShm = ::GfaIpcAcquirePointer(hShm))) { g_lf.Error("Unable to acquire SHM Pointer!\n"); nRet = -1; break; } g_lf.Info("Acquired SHM Pointer.\n"); ::GfaIpcDumpSHMROT(); #ifdef _LOG_DB_NAME CRemLogger rl(&rlp); if(!(bLogDbActive = rl.InitDatabase(false))) { g_lf.Error("Failed to initialize Log Database - %s\n", rl.LastError().c_str()); g_lf.Warning("Logging will be disabled!\n"); } #endif // _LOG_DB_NAME CRemVarTable map; CShm_t shm(pShm, hShm); pcPerf.ClockTrigger(); shm.InitPath(NULL, NULL); sPerf = CProcessClock::Interval2String(pcPerf.ClockGetElapsed()); TRACE("InitPath (%s).\n", sPerf.c_str()); pcPerf.ClockTrigger(); shm.CreateMembersTable(map); sPerf = CProcessClock::Interval2String(pcPerf.ClockGetElapsed()); TRACE("CreateMembersTable (%s).\n", sPerf.c_str()); if(bHasJSON) { pcPerf.ClockTrigger(); cntVals = map.LoadJSONValues(root); sPerf = CProcessClock::Interval2String(pcPerf.ClockGetElapsed()); g_lf.Info("Successfully restored %lu values from JSON file (%s).\n", cntVals, sPerf.c_str()); TRACE("Successfully restored %lu remanent values from JSON file (%s).\n", cntVals, sPerf.c_str()); } else { _SIG_BLOCK(&g_set); bool bRet = ::WriteJSONFile(szWriteFile, szBackupFile, static_cast(shm)); _SIG_UNBLOCK(&g_set); if(!bRet) { g_lf.Error("Failed to initialize JSON file! - %s\n", strerror(errno)); TRACE("Failed to initialize JSON file! - %s\n", strerror(errno)); break; } } #ifdef _DL_DB_NAME CDbPersist perst(_DL_DB_NAME, _DL_TAGS_TABLE, _DL_LOGS_TABLE, _DL_DB_USER, _DL_DB_PASS); pcPerf.ClockTrigger(); int nRestored = perst.RestoreValues(map, g_lf); sPerf = CProcessClock::Interval2String(pcPerf.ClockGetElapsed()); if(nRestored > 0) { g_lf.Info("Successfully restored %d Database-persistent values from Datalogger Database (%s).\n", nRestored, sPerf.c_str()); TRACE("Successfully restored %d Database-persistent values from Datalogger Database (%s).\n", nRestored, sPerf.c_str()); } #endif // _DL_DB_NAME g_fZombie = false; g_fRun = true; ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); g_lf.Info("Enter monitoring loop.\n"); while(g_fRun) { //////////////////////////////////////////////////////////////////////////////////////// // update app control info if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime))) { _ProcessCtrlMessages(hAC, hAI); if(!g_fRun) break; // _ProcessStateEvents(hAC, hAI); } //////////////////////////////////////////////////////////////////////////////////////// if(::nanosleep(&tsScanIntv, NULL) < 0) { if(!g_fRun) break; } pcWork.ClockTrigger(); if(!g_fPauseImp && !g_fPauseCmd) { if(_IntervalElapsed(&tsWriteCredits, _SECS_PER_WRITE_INTERVAL)) { if(nWriteCredits < _MAX_WRITE_CREDITS) { nWriteCredits++; TRACE("%d sec elapsed - write credits: %d\n", _SECS_PER_WRITE_INTERVAL, nWriteCredits); } } rv.nRetval = shm.CheckUpdateShm(true); if(rv.nUpdated || nPendingChanges) { if(nWriteCredits > 0) { if(nPendingChanges) { g_lf.Warning("Reacquired write credit(s). Total %d\n", nWriteCredits); } TRACE("%u SHM values changed ...\n", rv.nUpdated ? rv.nUpdated : nPendingChanges); _SIG_BLOCK(&g_set); bool bRet = ::WriteJSONFile(szWriteFile, szBackupFile, static_cast(shm)); _SIG_UNBLOCK(&g_set); if(!bRet) { g_lf.Error("Failed to write JSON file! - %s\n", strerror(errno)); TRACE("Failed to write JSON file! - %s\n", strerror(errno)); g_fRun = false; g_fZombie = true; break; } nWriteCredits--; nPendingChanges = 0; #ifdef _LOG_DB_NAME if(bLogDbActive) { ts = ::time(NULL); shm.Log(ts, rl); if(!rl.Flush(ts)) { g_lf.Error("Failed to log to Database - %s\n", rl.LastError().c_str()); g_lf.Warning("Logging will be disabled!\n"); bLogDbActive = false; } } #endif // _LOG_DB_NAME TRACE("write credits left: %d\n", nWriteCredits); } else if(!nPendingChanges) { std::vector vars; nPendingChanges = rv.nUpdated; TRACE("%u SHM value(s) changed, but no write credits left!!!\n", rv.nUpdated); g_lf.Warning("%u SHM value(s) changed, but no write credits left.\n", rv.nUpdated); #ifdef _DEBUG if(map.GetMaxUpdateVariables(vars, 5)) { for(auto i = vars.begin(); i != vars.end(); i++) { const CRemanent *pVar = *i; TRACE("%s -> %llu\n", pVar->GetPath(), pVar->GetUpdateCount()); } } #endif // _DEBUG } g_lf.Flush(); } } nUsecWorkTime = pcWork.ClockGetElapsed() / 1000; } TRACE("Process terminating...\n"); g_lf.Info("Leave monitoring loop.\n"); if(!g_fZombie && !bDbgInvolved) { _SIG_BLOCK(&g_set); bool bRet = ::WriteJSONFile(szWriteFile, szBackupFile, static_cast(shm)); _SIG_UNBLOCK(&g_set); if(!bRet) { g_lf.Error("Failed to write JSON file! - %s\n", strerror(errno)); TRACE("Failed to write JSON file! - %s\n", strerror(errno)); g_fZombie = true; } } #ifdef _LOG_DB_NAME if(bLogDbActive) { ts = ::time(NULL); shm.Log(ts, rl); if(!rl.Flush(ts)) { g_lf.Error("Failed to log to Database on exit - %s\n", rl.LastError().c_str()); bLogDbActive = false; } } #endif // _LOG_DB_NAME } while(false); if(g_nLastSig >= 0) { g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); g_nLastSig = -1; } if(hShm) { if(pShm) { g_lf.Info("Releasing SHM Pointer ...\n"); ::GfaIpcReleasePointer(hShm, pShm); } g_lf.Info("Releasing SHM Handle ...\n"); ::GfaIpcReleaseSHM(hShm); } if(g_fZombie) { if(hAC) ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie); TRACE("Enter Zombie state ...\n"); g_lf.Warning("Enter Zombie state ...\n"); g_lf.Flush(); pause(); if(g_nLastSig >= 0) g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); } if(hAC) { g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating); g_lf.Info("Releasing App Control ...\n"); ::GfaIpcAppCtrlRelease(hAC); } g_lf.Info("Process exit.\n\n"); g_lf.Close(); CLogfile::StdErr("Remanent exit.\n"); return 0; }