//////////////////////////////////////////////////////////////////////////////////////////////////// // #include #include #include #include #include #include #include #include #include #include #include #include #include "projal.h" //////////////////////////////////////////////////////////////////////////////////////////////////// // app control #define _APPID GFA_APPCTRL_APPID_SUMMARIST #define _APPNAME "Summarist" //#define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT)) #define _APP_CTRL_CYCLE_US 1000000 //////////////////////////////////////////////////////////////////////////////////////////////////// // #define _TRACK_TIME 1 #define _LOGFILE_NAME "summarist.log" #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL) #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL) #define _US_PER_SEC 1000000 #ifdef _SUMMARIST_CU_SLEEP_INTERVAL #if _SUMMARIST_CU_SLEEP_INTERVAL < 10 #define _CATCH_UP_SLEEP_INTERVAL 10000 #else // _SUMMARIST_CU_SLEEP_INTERVAL < 10 #define _CATCH_UP_SLEEP_INTERVAL (_SUMMARIST_CU_SLEEP_INTERVAL * 1000) #endif // _SUMMARIST_CU_SLEEP_INTERVAL < 10 #else // _SUMMARIST_CU_SLEEP_INTERVAL #define _CATCH_UP_SLEEP_INTERVAL (_US_PER_SEC / 10) #endif // _SUMMARIST_CU_SLEEP_INTERVAL #if _TRACK_TIME #define _CLOCK_TRIGGER() g_pc.ClockTrigger(1) #define _CLOCK_TRACE(f) _TraceTime(f) #else // _TRACK_TIME #define _CLOCK_TRIGGER() #define _CLOCK_TRACE(f) #endif // _TRACK_TIME //////////////////////////////////////////////////////////////////////////////////////////////////// // static volatile bool g_fRun = false; static volatile bool g_fPauseImp = false; static volatile bool g_fPauseCmd = false; static volatile bool g_fZombie = false; //static appid_t g_nDepRunning = 0; static sigset_t g_set; static CLogfile g_lf; static int g_nLastSig = -1; static CProcessClock g_pc; //////////////////////////////////////////////////////////////////////////////////////////////////// // static const char* _GetBaseDir(std::string &rstrBaseDir) { char szBaseDir[PATH_MAX]; const char *pszBaseDir = NULL; #ifdef _LOG_BASE_DIR pszBaseDir = _LOG_BASE_DIR; if(!pszBaseDir || !*pszBaseDir || !::DirectoryExist(pszBaseDir)) { CLogfile::StdErr("Invalid base directory config! Using app directory!\n"); pszBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir)); } rstrBaseDir = pszBaseDir; #else // _LOG_BASE_DIR UNUSED(pszBaseDir); rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir)); #endif // _LOG_BASE_DIR rtrim(rstrBaseDir, "/"); return rstrBaseDir.c_str(); } //////////////////////////////////////////////////////////////////////////////////////////////////// // static void _SigHandler(int sig) { g_nLastSig = sig; g_fRun = g_fPauseImp = g_fPauseCmd = g_fZombie = false; } //////////////////////////////////////////////////////////////////////////////////////////////////// // #if _TRACK_TIME static void _TraceTime(const char *pszMsg) { TRACE("%s (%s).\n", pszMsg ? pszMsg : "", CProcessClock::Interval2String(g_pc.ClockGetElapsed(1)).c_str()); } #endif // _TRACK_TIME //////////////////////////////////////////////////////////////////////////////////////////////////// static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI) { ctrlmsg_t nCtrlMsg; while(g_fRun && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI))) { switch(nCtrlMsg) { case GFA_APPCTRL_CTRLMSG_STOP: g_fRun = false; g_fPauseImp = false; g_fPauseCmd = false; g_fZombie = false; g_lf.Info("Received Control Message 'Stop'\n"); return; case GFA_APPCTRL_CTRLMSG_PAUSE: if(!g_fPauseCmd) { g_fPauseCmd = true; if(!g_fPauseImp) { ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused); g_lf.Info("Received Control Message 'Pause'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused)); } } break; case GFA_APPCTRL_CTRLMSG_RESUME: if(g_fPauseCmd) { g_fPauseCmd = false; if(!g_fPauseImp) { g_lf.Info("Received Control Message 'Resume'\n"); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); } } break; default: break; } } } //////////////////////////////////////////////////////////////////////////////////////////////////// /* static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI) { appid_t nAppIdSrc; bool fOldPaused = g_fPauseImp; char szDispName[128]; while(g_fRun && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI))) { GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc); GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName)); TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); if(nAppIdSrc & _DEPENDENCIES) { if(state == GIAS_Running) { g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); g_nDepRunning |= nAppIdSrc; } else { g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state)); g_nDepRunning &= ~nAppIdSrc; } } } if(g_fRun) { g_fPauseImp = (g_nDepRunning != _DEPENDENCIES); if(!g_fPauseCmd && (fOldPaused != g_fPauseImp)) { fOldPaused = g_fPauseImp; GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running; ::GfaIpcAppCtrlSetState(hAC, newState); if(g_fPauseImp) g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState)); else g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState)); } } }*/ void _AppCtrlUsleep(HAPPCTRL hAC, unsigned long nUsSleep, unsigned long long nUsWorkTime) { HAPPINFO hAI; while(nUsSleep > _APP_CTRL_CYCLE_US) { nUsSleep -= _APP_CTRL_CYCLE_US; usleep(_APP_CTRL_CYCLE_US); if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsWorkTime))) { _ProcessCtrlMessages(hAC, hAI); if(!g_fRun) return; // _ProcessStateEvents(hAC, hAI); } } if(nUsSleep) { usleep(nUsSleep); } } //////////////////////////////////////////////////////////////////////////////////////////////////// // int main(int argc, char **argv) { CProcessInstance pi; HAPPCTRL hAC = NULL; HAPPINFO hAI; char szLogFile[PATH_MAX]; std::string strBaseDir; const char *pszBaseDir = NULL; bool bDropTables = false; int c, ca; CProcessClock pcWork; unsigned long long nUsWorkTime = 0; //////////////////////////////////////////////////////////////////////////////////////////////// // check for multiple instances if(!pi.LockInstance(UUID_SHM)) { CLogfile::StdErr("Failed to start instance!\n"); return -1; } //////////////////////////////////////////////////////////////////////////////////////////////// while((c = getopt(argc, argv, "d:")) != -1) { switch(c) { case 'd': if((ca = atoi(optarg)) == 666) bDropTables = true; break; } } //////////////////////////////////////////////////////////////////////////////////////////////// // configure signal handling struct sigaction sa; ::sigfillset(&g_set); sigaddset(&g_set, SIGUSR1); memset(&sa, 0, sizeof(sa)); sa.sa_handler = _SigHandler; sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\' sigaction(SIGTERM, &sa, NULL); // handles normal termination sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort()) sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C' sa.sa_handler = SIG_IGN; sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z' sigaction(SIGSTOP, &sa, NULL); // ignores Stop sigaction(SIGCONT, &sa, NULL); // ignores Continue sigaction(SIGCHLD, &sa, NULL); // ignores child process termination sigaction(0, &sa, NULL); // ignores shell termination do { g_fZombie = true; //////////////////////////////////////////////////////////////////////////////////////////// // get the base directory for output files if(!pszBaseDir) pszBaseDir = _GetBaseDir(strBaseDir); CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir); //////////////////////////////////////////////////////////////////////////////////////////// // initialize log file sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME); if(!g_lf.Open(szLogFile)) { CLogfile::StdErr("Failed to create/open log file!\n"); break; } #if _SUMMARIST_ENABLED g_lf.Info("Process started.\n"); //////////////////////////////////////////////////////////////////////////////////////////// // initialize app control g_lf.Info("Acquire AppCtrl-Handle.\n"); if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _APP_CTRL_CYCLE_US, _APP_CTRL_CYCLE_US * 3))) { g_lf.Error("Failed to acquire AppCtrl-Handle!\n"); break; } ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing)); /* if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES)) { g_lf.Error("Failed to subscribe state event notifications!\n"); break; }*/ //////////////////////////////////////////////////////////////////////////////////////////// if(strlen(_DLOG_DB_NAME) >= _SUM_MAX_DB_NAME_LENGTH) { g_lf.Error("Database name too long!\n"); break; } if(strlen(_DLOG_DB_USER) >= _SUM_MAX_DB_USER_LENGTH) { g_lf.Error("Database username too long!\n"); break; } if(strlen(_DLOG_DB_PASS) >= _SUM_MAX_DB_PASS_LENGTH) { g_lf.Error("Database password too long!\n"); break; } if(strlen(_DLOG_TAGS_TABLE) >= _SUM_MAX_TABLE_NAME_LENGTH) { g_lf.Error("Tag table name too long!\n"); break; } if(strlen(_DLOG_LOGS_TABLE) >= _SUM_MAX_TABLE_NAME_LENGTH) { g_lf.Error("Log table name too long!\n"); break; } //////////////////////////////////////////////////////////////////////////////////////////// // initialize summarist SUMMARIST_PARAMS sup; memset(&sup, 0, sizeof(sup)); strncpy(sup.szDBName, _DLOG_DB_NAME, _SUM_MAX_DB_NAME_LENGTH - 1); strncpy(sup.szDBUser, _DLOG_DB_USER, _SUM_MAX_DB_USER_LENGTH - 1); strncpy(sup.szDBPass, _DLOG_DB_PASS, _SUM_MAX_DB_PASS_LENGTH - 1); strncpy(sup.szTagsTable, _DLOG_TAGS_TABLE, _SUM_MAX_TABLE_NAME_LENGTH - 1); strncpy(sup.szLogsTable, _DLOG_LOGS_TABLE, _SUM_MAX_TABLE_NAME_LENGTH - 1); strcpy(sup.szITagsView, "ilTags"); sup.nSampleIntv = _DLOG_INTV_SAMPLE; sup.nLogIntv = _DLOG_INTV_LOG; sup.nWriteIntv = _DLOG_INTV_WRITE; CSummarist sum(&sup, g_lf); if(!sum.Initialze(g_timeWnds, _countof(g_timeWnds), bDropTables)) break; else if(bDropTables) { g_fZombie = false; break; } //////////////////////////////////////////////////////////////////////////////////////////// // do work CMySqlDB db; time_t tsMinLogs, tsMaxLogs, tsBase, tsNext; #if _SUMMARIST_PROCESS_OUTDATED time_t tsLastOutdated = 0; #endif // _SUMMARIST_PROCESS_OUTDATED g_fZombie = false; g_fRun = true; ::GfaIpcAppCtrlSetState(hAC, GIAS_Running); g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running)); while(g_fRun) { //////////////////////////////////////////////////////////////////////////////////////// // update app control info if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsWorkTime))) { _ProcessCtrlMessages(hAC, hAI); if(!g_fRun) break; // _ProcessStateEvents(hAC, hAI); } //////////////////////////////////////////////////////////////////////////////////////// if(!g_fPauseImp && !g_fPauseCmd) { if(sum.Connect(db)) { pcWork.ClockTrigger(); unsigned long nUsSleep = sum.GetProcessingFrequenzy() * _US_PER_SEC; if(sum.GetMinMaxTimestampFromLogs(db, tsMinLogs, tsMaxLogs)) { if((tsMinLogs != _SUM_INVALID_TIMESTAMP_VALUE) && (tsMaxLogs != _SUM_INVALID_TIMESTAMP_VALUE)) { tsBase = sum.GetBaseTimestamp(tsMinLogs); for(size_t i = 0; (i < sum.TimeWndCount()) && g_fRun; ++i) { time_t tSumLast, tWndFrom, tWndTo; if(!sum.GetLastSummarizeTimestamp(db, i, tSumLast)) { g_fZombie = true; g_fRun = false; break; } if(sum.GetNextTimeFrame(db, i, tSumLast, tsBase, tWndFrom, tWndTo)) { if((tWndFrom <= tsMaxLogs) && (tWndTo <= time(NULL))) { _SIG_BLOCK(&g_set); _CLOCK_TRIGGER(); if(g_fRun) g_fRun = sum.Summarize(db, i, tWndFrom, tWndTo); _CLOCK_TRACE("Summarize"); _SIG_UNBLOCK(&g_set); } sum.SetNextTimeFrameStart(i, tWndTo); } } if((tsNext = sum.GetNextDueTimeFrameStart()) != _SUM_INVALID_TIMESTAMP_VALUE) { time_t tsNow = time(NULL); if(tsNext > tsNow) nUsSleep = ((tsNext - tsNow) + 1) * _US_PER_SEC; else if(tsNext < tsMaxLogs) nUsSleep = _CATCH_UP_SLEEP_INTERVAL; } #if _SUMMARIST_PROCESS_OUTDATED if(g_fRun && (nUsSleep > _CATCH_UP_SLEEP_INTERVAL) && (tsLastOutdated < tsMinLogs)) // not in catch-up mode { g_pc.ClockTrigger(); pc_time64_t nElapsed; for(size_t i = 0; g_fRun && (i < sum.TimeWndCount()); ++i) { _SIG_BLOCK(&g_set); _CLOCK_TRIGGER(); sum.ProcessOutdated(db, i, tsMinLogs); _CLOCK_TRACE("ProcessOutdated"); _SIG_UNBLOCK(&g_set); } nElapsed = g_pc.ClockGetElapsed() / 1000LL; if((pc_time64_t)nUsSleep > nElapsed) nUsSleep -= (unsigned long)nElapsed; tsLastOutdated = tsMinLogs; } #endif // _SUMMARIST_PROCESS_OUTDATED } else { TRACE("GetMinMaxTimestampFromLogs: timestamps invalid - no logs so far?\n"); g_lf.Warning("GetMinMaxTimestampFromLogs: timestamps invalid - no logs so far?\n"); } } else { TRACE("GetMinMaxTimestampFromLogs failed!\n"); g_fZombie = true; g_fRun = false; } sum.Close(db); nUsWorkTime = pcWork.ClockGetElapsed() / 1000; if(g_fRun) { TRACE("Sleep: %s ...\n", CProcessClock::Interval2String((pc_time64_t)nUsSleep * 1000LL).c_str()); _AppCtrlUsleep(hAC, nUsSleep, nUsWorkTime); } } else { TRACE("Database connection failed!\n"); g_fZombie = true; g_fRun = false; } } else { _AppCtrlUsleep(hAC, _APP_CTRL_CYCLE_US, 0); } } // while(g_fRun) } while(false); //////////////////////////////////////////////////////////////////////////////////////////////// if(g_nLastSig >= 0) { g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); g_nLastSig = -1; } if(g_fZombie) { if(hAC) ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie); TRACE("Enter Zombie state ...\n"); g_lf.Warning("Enter Zombie state ...\n"); g_lf.Flush(); pause(); if(g_nLastSig >= 0) g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig)); } if(hAC) { g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating)); ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating); g_lf.Info("Releasing App Control ...\n"); ::GfaIpcAppCtrlRelease(hAC); } g_lf.Info("Normal process exit.\n\n"); g_lf.Close(); CLogfile::StdErr("Summarist exit.\n"); return 0; #else // _SUMMARIST_ENABLED g_lf.Info("Summarist disabled!\n"); return -1; #endif // _SUMMARIST_ENABLED }