11 #include "TxUCMCollector.h"
12 #include "StDbFieldI.h"
13 #include "FieldList.h"
14 #include "StDbFieldIIterator.h"
15 #include "StUcmTasks.h"
16 #include "StUcmJobs.h"
17 #include "StUcmEvents.h"
22 #include <log4cxx/logger.h>
23 #include <log4cxx/consoleappender.h>
24 #include <log4cxx/patternlayout.h>
25 #include <log4cxx/helpers/loglog.h>
26 #include <log4cxx/helpers/optionconverter.h>
27 #include <log4cxx/helpers/stringhelper.h>
29 using namespace log4cxx;
30 using namespace log4cxx::helpers;
31 using namespace log4cxx::spi;
34 using namespace TxLogging;
35 using namespace StDbField;
40 const char *TxUCMCollector::fgTs =
"ts";
41 const char *TxUCMCollector::fgEvent =
"event";
42 const char *TxUCMCollector::fgBJobID =
"broker.job.id";
43 const char *TxUCMCollector::fgBTaskID =
"broker.task.id";
44 const char *TxUCMCollector::fgRequester =
"requester.name";
45 const char *TxUCMCollector::fgContext =
"context";
46 const char *TxUCMCollector::fgLevel =
"level";
47 const char *TxUCMCollector::fgStage =
"stage";
48 const char *TxUCMCollector::fgKey =
"key";
49 const char *TxUCMCollector::fgValue =
"value";
51 const char *TxUCMCollector::fgNewTask =
"com.txcorp.ucm.newtask";
52 const char *TxUCMCollector::fgUpdateTask =
"com.txcorp.ucm.updatetask";
53 const char *TxUCMCollector::fgAddJob =
"com.txcorp.ucm.addjob";
54 const char *TxUCMCollector::fgUpdateJob =
"com.txcorp.ucm.updatejob";
55 const char *TxUCMCollector::fgSiteLocation =
"com.txcorp.ucm.job.siteLocation";
56 const char *TxUCMCollector::fgStateID =
"com.txcorp.ucm.job.stateID";
57 const char *TxUCMCollector::fgGridJobID =
"com.txcorp.ucm.job.gridJobID";
58 const char *TxUCMCollector::fgAppStart =
"com.txcorp.ucm.app.start";
59 const char *TxUCMCollector::fgAppEnd =
"com.txcorp.ucm.app.end";
61 const char *TxUCMCollector::fgStatusFile =
"txucmcollectorstatus.properties";
62 const char *TxUCMCollector::fgStatusFileName =
"current.logfile.name";
63 const char *TxUCMCollector::fgStatusFileModTime =
"current.logfile.modtime";
64 const char *TxUCMCollector::fgStatusFilePos =
"current.logfile.pos";
72 sprintf(buffer,
"%d",i);
73 return string(buffer);
78 MYSQL *TxUCMCollector::getConnection()
80 const char *host =
"heston.star.bnl.gov";
81 const char *user =
"StarLogger";
82 const char *passwd =
"logger";
83 return getConnection(host,user,passwd);
87 MYSQL *TxUCMCollector::getConnection (
const string&dbUrl,
const string&dbUsername
88 ,
const string&dbPassword)
90 return getConnection (dbUrl.c_str(),dbUsername.c_str(),dbPassword.c_str());
94 MYSQL *TxUCMCollector::getConnection (
const char *cdbUrl,
const char *cdbUsername
95 ,
const char *cdbPassword)
97 if (!fIsConnectionOpen) {
98 if ( !(connection= mysql_init(connection)) ) {
99 log->error(
"MYSQL: ---- > No init connection");
101 const char *host = cdbUrl;
102 const char *user = cdbUsername;
103 const char *passwd = cdbPassword;
104 const char *db = dbName.c_str();
105 unsigned int port = 3306;
107 if (!(mysql_real_connect(connection
116 string error = __FUNCTION__
117 + string(
"host: ") + host
118 + string(
"; user: ") + user
119 + string(
" passwd: ") + passwd
120 + string(
" db: ") + db
121 + string(
" port:") + itoa(port)
122 +
" error: " + mysql_error(connection);
123 log->debug(error.c_str());
125 fIsConnectionOpen =
false;
127 string error =
"Ok connection to Db : "
128 + string(
"host: <") + host
129 + string(
"> user: <") + user
130 + string(
"> passwd: <") + passwd
131 + string(
"> db: <") + db
132 + string(
"> port: ") + itoa(port);
133 log->debug(error.c_str());
134 fIsConnectionOpen =
true;
142 unsigned int TxUCMCollector::execute(
const string &sql)
144 return execute(sql.c_str());
148 unsigned int TxUCMCollector::execute(
const char *sql)
151 if (getConnection()) {
153 log->debug(
string(
"TxUCMCollector::execute ") + sql);
155 mysql_free_result(fResult);
158 if (( ret = mysql_query(connection,query.c_str()) )) {
159 log->error(std::string(
"MYSQL QUERY:") + mysql_error(connection));
161 fResult = mysql_store_result(connection);
168 void TxUCMCollector::closeConnection()
170 if (fIsConnectionOpen) {
172 mysql_free_result(fResult);
175 mysql_close(connection);
176 if (mysql_errno(connection)) fprintf(stderr,
"MYSQL close ERROR %s \n",mysql_error(connection));
178 fIsConnectionOpen =
false;
183 TxUCMCollector::TxUCMCollector ()
185 ,fResult(),fField(),fRow()
186 ,fIsConnectionOpen(false), sleepTime(10),currLogFilePos(0)
187 ,fBrokerJobID(-1), fDbJobID(-1)
191 log = Logger::getLogger(_T(
"TxUCMCollector"));
193 AppenderList apps = log->getAllAppenders();
196 ConsoleAppenderPtr appender =
new ConsoleAppender(
197 new PatternLayout(
"TxUCMCollector: %-3c{2}:%-5p - %m%n"));
198 appender->setName(_T(
"TxUCMCollectorAppender"));
199 log->addAppender(appender);
204 TxUCMCollector::~TxUCMCollector ()
212 boolean endsWith (std::string str,
const char *suffix)
214 return StringHelper::endsWith(str, _T(suffix));
220 static string trim (std::string str)
222 return StringHelper::trim(str);
232 static vector<std::string> split(
const std::string &str,
const std::string &sep)
234 vector<std::string> splits;
237 while ((posNew = str.find(sep,posOld)) != string::npos)
239 splits.push_back(str.substr(posOld,posNew-posOld));
240 posOld=posNew+sep.size();
243 if (posOld < str.size()) splits.push_back(str.substr(posOld,string::npos));
249 boolean success =
false;
252 log->debug (
" TxUCMCollector::initDb . . . ");
253 success = this->init ();
254 success = success & this->loadDatabase ();
263 log->error(
"Failed to initialize, review log file");
280 boolean TxUCMCollector::init ()
283 boolean success =
false;
285 Properties properties =
new Properties();
287 properties.load (
new FileInputStream (
"txucmcollector.properties"));
290 dbName = properties.getProperty (
"db.name");
291 dbUrl =
"jdbc:mysql://" + properties.getProperty (
"db.host") +
":"
292 + properties.getProperty (
"db.port") +
"/" + dbName;
293 dbUsername = properties.getProperty (
"db.username");
294 dbPassword = properties.getProperty (
"db.password");
298 logsDir = properties.getProperty (
"logs.dir");
299 sleepTime = (
new Long (properties.getProperty (
"sleep.time.sec"))).longValue ();
315 dbUrl =
"heston.star.bnl.gov";
316 dbUsername =
"StarLogger";
317 dbPassword =
"logger";
329 boolean TxUCMCollector::loadDatabase()
333 boolean success =
false;
336 log->debug (
"dbName: " + dbName);
337 log->debug (
"dbUrl: " + dbUrl);
339 connection = getConnection (dbUrl.c_str(), dbUsername.c_str(), dbPassword.c_str());
340 log->debug (
"Successfully loaded database: dbName = " + dbName);
382 File file =
new File (currLogFile);
383 if ((Calendar.getInstance ().getTimeInMillis () - file.lastModified ())
384 >= (24 * 60 * 60 * 1000)) {
385 file.renameTo (
new File (currLogFile +
".archive"));
391 Properties properties =
new Properties();
393 properties.setProperty (statusFileName, currLogFile);
394 properties.setProperty (statusFileModTime,
395 new Long (file.lastModified ()).toString ());
396 properties.setProperty (statusFilePos,
397 new Long (currLogFilePos).toString ());
398 properties.store (
new FileOutputStream (statusFile), null);
402 } CATCH (IOException ioe) {
403 log->error (
"Failed to write current file info to <" +
405 log->error (ioe.getMessage ());
435 void TxUCMCollector::setCurrLogFile () {
444 if (
new File (statusFile).exists ()) {
445 Properties properties =
new Properties();
446 properties.load (
new FileInputStream (statusFile));
448 currLogFile = properties.getProperty (statusFileName);
449 if (currLogFile != null) {
450 const char * strTime = properties.getProperty (statusFileModTime);
451 long longTime = (
new Long (strTime)).longValue ();
453 (
new Long (properties.getProperty (statusFilePos))).longValue ();
456 if (
new File (currLogFile).exists ()) {
458 if (
new File (currLogFile).lastModified () == longTime) {
459 System.out.println (
"Waiting for " + currLogFile +
" to be updated...");
460 log->info (
"Waiting for " + currLogFile +
" to be updated...");
461 Thread.sleep (sleepTime * 1000);
470 new File (statusFile).delete ();
480 File logFileDir =
new File (logsDir);
483 FilenameFilter filter =
new FilenameFilter() {
484 boolean accept (File file,
const char * name) {
485 return name.endsWith (
".log");
489 File[] logFiles = logFileDir.listFiles (filter);
491 System.out.println (
"Found " + logFiles.length +
" log files in " + logsDir);
492 log->info (
"Found " + logFiles.length +
" log files in " + logsDir);
496 if (logFiles.length > 0) {
497 File file = logFiles [0];
498 for (
int i = 1; i < logFiles.length; i++) {
499 if (file.lastModified () > logFiles [i].lastModified ()) {
505 currLogFile = file.toString ();
511 System.out.println (
"Waiting for new log files in " + logsDir);
512 log->info (
"Waiting for new log files in " + logsDir);
513 Thread.sleep (sleepTime * 1000);
532 void TxUCMCollector::processLogFile () {
535 System.out.println (
"processing " + currLogFile);
537 BufferedReader in =
new BufferedReader (
new FileReader (currLogFile));
541 for (
int i = 0; i < currLogFilePos; i++) {
545 while ((str = in.readLine()) != null) {
580 vector<std::string> keysNVals;
581 std::string message = msg;
582 size_t hdrDelimIndex = message.find(
':');
583 if (hdrDelimIndex != string::npos && hdrDelimIndex < message.find(
"=\"")) {
584 keysNVals = split(message.substr(hdrDelimIndex + 1),
"\" ");
589 keysNVals = split(message,
"\" ");
592 log->debug(_T(
"TxUCMCollector::processMessage: ")+ message);
593 for (
size_t i = 0; i < keysNVals.size(); i++) {
596 vector<std::string> keyNVal = split(keysNVals [i],
"=\"");
599 std::string value = (keyNVal.size() == 2)
604 value = endsWith(value,
"\"")
605 ? value.substr (0, value.size() - 1)
608 msgHashMap.insert(pair<std::string,std::string>(trim(keyNVal [0]), value));
609 log->debug(
string(
"next pair: ") + trim(keyNVal [0]) +
"<" + value +
">");
614 std::string keyVal = msgHashMap[string(fgKey)];
615 if (keyVal.empty()) {
617 sprintf(buffer,
"%d",(
int)keysNVals.size());
618 log->error (
string(
"Wrong message format: \"")
622 +
"does not contains any <"
632 if (keyVal == fgNewTask ) this->createNewTask ();
636 else if (keyVal == fgUpdateTask) this->updateTask ();
640 else if (keyVal == fgAddJob) this->addJob ();
644 else if (keyVal == fgUpdateJob) this->updateJob ();
648 else if (keyVal == fgSiteLocation) this->setJobsField (
"siteLocation");
652 else if (keyVal == fgStateID) this->setJobsField (
"stateID");
656 else if (keyVal == fgGridJobID) this->setJobsField (
"gridJobID");
662 else this->addEvent ();
672 void TxUCMCollector::createNewTask () {
673 string newTaskKeys =
"brokerTaskID, requesterID";
674 string newTaskVals =
"'" + msgHashMap[fgBTaskID] +
"'" +
675 ", '" + msgHashMap[fgRequester] +
"'";
678 if (!this->recordExists (
string(
"brokerTaskID = \"") + msgHashMap[fgBTaskID] +
"\"",
680 vector <string> newTask = split(msgHashMap[fgValue],
"', ");
681 for (
size_t i = 0; i < newTask.size(); i++) {
682 vector <string> taskKeyNVal = split(newTask [i],
"='");
683 if (taskKeyNVal.size() == 2) {
684 newTaskKeys +=
", " + trim(taskKeyNVal [0]);
686 newTaskVals +=
", '";
687 newTaskVals += endsWith (trim(taskKeyNVal [1]),
"'")
688 ? taskKeyNVal [1].substr (0, taskKeyNVal[1].size()-1)
689 : trim(taskKeyNVal [1]);
693 insertRecord (
string(
"(") + newTaskKeys +
") VALUES (" + newTaskVals +
")",
696 this->createJobsTable ();
697 this->createEventsTable ();
700 log->debug (
string(
"Record with brokerTaskID = ") + msgHashMap[fgBTaskID] +
711 void TxUCMCollector::updateTask () {
714 if (this->recordExists (
string(
"brokerTaskID = \"") + msgHashMap[fgBTaskID] +
"\"",
716 updateRecord (msgHashMap[fgValue],
718 string(
"brokerTaskID = '") + msgHashMap[fgBTaskID] +
"'");
721 log->error (
string(
"Record with brokerTaskID = ") + msgHashMap[fgBTaskID] +
722 " does not exist, so creating a new record instead of updating");
723 this->createNewTask ();
725 if (this->recordExists (
string(
"brokerTaskID = \"") + msgHashMap[fgBTaskID] +
"\"",
727 updateRecord (msgHashMap[fgValue],
729 string(
"brokerTaskID = '") + msgHashMap[fgBTaskID] +
"'");
741 void TxUCMCollector::addJob () {
745 if (!this->recordExists (
string(
"brokerTaskID = \"") + msgHashMap[fgBTaskID] +
"\"",
747 log->info (msgHashMap[fgBTaskID]
748 +
" does not exist in Tasks table");
749 insertRecord (
string(
"(brokerTaskID, requesterID) VALUES ") +
750 "('" + msgHashMap[fgBTaskID] +
"', " +
751 "'" + msgHashMap[fgRequester] +
"')",
755 this->createJobsTable ();
756 this->createEventsTable ();
759 if (!this->recordExists (
string(
"brokerJobID = \"") + msgHashMap[fgBJobID] +
"\"",
762 std::string newJobKeys =
"taskID, brokerJobID";
763 std::string newJobVals = string(
"(SELECT taskID FROM Tasks WHERE brokerTaskID=") +
764 "'" + msgHashMap[fgBTaskID] +
"')" +
765 ", '" + msgHashMap[fgBJobID] +
"'";
767 vector<std::string> newJob = split(msgHashMap[fgValue],
"', ");
769 for (
size_t i = 0; i < newJob.size(); i++) {
770 vector<std::string> jobKeyNVal = split(newJob [i],
"='");
771 if (jobKeyNVal.size() == 2) {
772 newJobKeys +=
", " + trim(jobKeyNVal [0]);
775 newJobVals += endsWith(trim(jobKeyNVal [1]),
"'")
776 ? jobKeyNVal [1].substr (0, jobKeyNVal.size() - 1)
777 : trim(jobKeyNVal [1]);
782 insertRecord (
string(
"(") + newJobKeys +
") VALUES (" + newJobVals +
")",
785 log->debug (
"Record with brokerJobID = " + msgHashMap[fgBJobID] +
791 string TxUCMCollector::tableNamePrefix(
const char *prefix)
const
793 string fullTableName =
795 + string(
"_") + msgHashMap.find(fgRequester)->second
796 + string(
"_") + msgHashMap.find(fgBTaskID)->second;
797 ((
TxUCMCollector*)
this)->log->debug(
string(__FUNCTION__)+
"<" + fullTableName +
">");
798 return fullTableName;
802 string TxUCMCollector::jobTableName()
const
804 return tableNamePrefix(
"Jobs");
807 std::string TxUCMCollector::eventTableName()
const
809 return tableNamePrefix(
"Events");
818 void TxUCMCollector::updateJob () {
821 if (this->recordExists (
string(
"brokerJobID = \"") + msgHashMap[fgBJobID] +
"\"",
823 updateRecord ( msgHashMap[fgValue], jobTableName(),
824 "brokerJobID = '" + msgHashMap[fgBJobID] +
"'");
826 log->debug (
"Record with brokerJobID = " + msgHashMap[fgBJobID] +
827 " does not exist, so creating a new record instead of updating");
836 void TxUCMCollector::setJobsField (
const string &fieldName) {
837 setJobsField (fieldName.c_str());
840 void TxUCMCollector::setJobsField (
const char * fieldName) {
841 if (this->recordExists (
string(
"brokerJobID = \"") + msgHashMap[fgBJobID] +
"\"",
843 updateRecord (
string(fieldName) +
" = '" + msgHashMap[fgValue] +
"'",
845 "brokerJobID = '" + msgHashMap[fgBJobID] +
"'");
847 log->error (
string(
"Record with brokerJobID = ") + msgHashMap[fgBJobID] +
848 " does not exist, so creating a new record instead of updating");
859 void TxUCMCollector::addEvent () {
863 if (!this->recordExists (
string(
"brokerTaskID = \"") + msgHashMap[fgBTaskID] +
"\"",
865 log->info (msgHashMap[fgBTaskID]
866 +
" does not exist in Tasks table");
867 insertRecord (
string(
"(brokerTaskID, requesterID) VALUES ") +
868 "('" + msgHashMap[fgBTaskID] +
"', " +
869 "'" + msgHashMap[fgRequester] +
"')",
874 this->createJobsTable ();
875 this->createEventsTable ();
880 if (!this->recordExists (
string(
"brokerJobID = \"") + msgHashMap[fgBJobID] +
"\"",
883 log->info (msgHashMap[fgBTaskID]
884 +
" does not exist in Jobs table");
886 std::string newJobKeys =
"taskID, brokerJobID";
887 std::string newJobVals = string(
"(SELECT taskID FROM Tasks WHERE brokerTaskID=")
888 +
"'" + msgHashMap[fgBTaskID] +
"')"
889 +
", '" + msgHashMap[fgBJobID] +
"'";
891 insertRecord (
string(
"(taskID, brokerJobID) VALUES ") +
892 "((SELECT taskID FROM Tasks WHERE brokerTaskID=" +
893 "'" + msgHashMap[fgBTaskID] +
"')" +
894 ", '" + msgHashMap[fgBJobID] +
"')",
900 std::string newEventKeys =
"jobID, levelID, context, time, stageID, messageKey, messageValue";
901 std::string newEventVals = string(
"(SELECT jobID FROM `") + jobTableName() +
"` WHERE brokerJobID=" +
902 "'" + msgHashMap[fgBJobID] +
"')" +
903 ", '" + msgHashMap[fgLevel] +
"'" +
904 ", '" + msgHashMap[fgContext] +
"'" +
905 ", '" + msgHashMap[fgTs] +
"'" +
906 ", '" + msgHashMap[fgStage] +
"'" +
907 ", '" + msgHashMap[fgKey] +
"'" +
908 ", '" + msgHashMap[fgValue] +
"'";
910 insertRecord (
string(
"(") + newEventKeys +
") VALUES (" + newEventVals +
")",
912 static string FAILED_JOB_ATTRIBUTE;
913 if ( FAILED_JOB_ATTRIBUTE.empty() ) {
914 stringstream oss(FAILED_JOB_ATTRIBUTE);
915 oss << TxEventLog::FAILED;
917 static string DONE_JOB_ATTRIBUTE;
918 if ( DONE_JOB_ATTRIBUTE.empty() ) {
919 stringstream oss(DONE_JOB_ATTRIBUTE);
920 oss << TxEventLog::DONE;
922 if (msgHashMap[fgStage] == FAILED_JOB_ATTRIBUTE || msgHashMap[fgStage] == DONE_JOB_ATTRIBUTE ) {
923 updateRecord (
"taskRemainSize=taskRemainSize-1"
925 ,
string(
"brokerTaskID=") +
"'" + msgHashMap[fgBTaskID] +
"'" );
933 void TxUCMCollector::createJobsTable () {
935 std::string tableName =
"`" + jobTableName() +
"` ";
936 this->createTable (tableName , std::string(
"jobspattern"));
944 void TxUCMCollector::createEventsTable () {
946 std::string tableName =
"`" + eventTableName() +
"` ";
947 this->createTable (tableName, std::string(
"eventspattern"));
958 void TxUCMCollector::insertRecord (
const string &insertStr,
const string &tableName)
960 insertRecord(insertStr.c_str(),tableName.c_str());
963 void TxUCMCollector::insertRecord (
const char * insertStr,
const char * tableName) {
966 if (!execute(
string(
"INSERT INTO `") + tableName +
"` " + insertStr))
967 log->debug (
string(
"Created new record for ") + tableName +
": " + insertStr);
969 log->error (
string(mysql_error(connection)) +
" the new record for " + tableName +
": " + insertStr);
986 void TxUCMCollector::updateRecord (
const string&updateStr,
const string&tableName,
const string&condition)
988 updateRecord (updateStr.c_str(), tableName.c_str(), condition.c_str());
991 void TxUCMCollector::updateRecord (
const char * updateStr,
const char * tableName,
const char * condition)
995 if (!execute(
string(
"UPDATE `") + tableName
996 +
"` SET " + updateStr
997 +
" WHERE " + condition))
998 log->debug(
string(
"Updated new record for ") + tableName +
" with values: " + updateStr);
1013 boolean TxUCMCollector::recordExists (
const string&selectStr,
const string&tableName)
1016 recordExists (selectStr.c_str(),tableName.c_str());
1020 boolean TxUCMCollector::recordExists (
const char * selectStr,
const char * tableName) {
1021 unsigned long nRows = 0;
1024 execute(
string(
"SELECT * FROM `") + tableName
1025 +
"` WHERE " + selectStr);
1027 nRows = mysql_num_rows(fResult);
1036 return nRows ?
true:
false;
1046 void TxUCMCollector::createTable (
const string&table,
const string&like)
1048 const char *likestr = like.empty() ? 0 : like.c_str();
1049 createTable (table.c_str(), likestr);
1053 void TxUCMCollector::createTable (
const char * table,
const char *like) {
1055 std::string query = string(
"CREATE TABLE IF NOT EXISTS ") + table;
1056 if (like && like[0]) query += std::string(
" LIKE `") + like +
"`";
1057 if (!execute(query))
1058 log->debug (
string(
"Created new table: ") + table);
1072 void TxUCMCollector::usage (Options options)
1076 HelpFormatter formatter =
new HelpFormatter ();
1077 formatter.printHelp (
"Tx UCM Collector", options);
1085 fillTaskList(tasks,limit,offset);
1093 fillJobList(jobs,limit,offset,task);
1097 int TxUCMCollector::getJobId(
const char *reqName,
const char *taskBrokerID,
int brokerJobID)
1101 setBrokerTaskID (taskBrokerID);
1102 setBrokerJobID (brokerJobID);
1104 string where = string(
" brokerJobID='") + itoa(brokerJobID) +
"' ";
1105 queryTable(jobTableName().c_str(),0,0,where.c_str());
1107 int nRows = mysql_num_rows(fResult) ;
1108 if (nRows<=0 || nRows >1 ) {
1109 log->error(
string(
"Can not fetch the job id for the ") + taskBrokerID +
" broker id=" + itoa(brokerJobID) +
" nrow=" + itoa(nRows));
1113 id = job->
getField(
"jobID")->toInt();
1128 fillEventList(events,limit,offset);
1133 int TxUCMCollector::fillTaskList(
StUcmTasks &tasks,
int limit,
int offset)
1136 return fillUcmList(
"Tasks",l,limit,offset);
1140 int TxUCMCollector::fillUcmList(
const char *type,
RecordList &records,
int limit,
int offset)
1142 my_ulonglong nRows = 0;
1145 if (
string(type) ==
"Tasks") {
1146 queryTaskTable(limit,offset);
1147 }
else if (
string(type) ==
"Jobs" ) {
1148 queryJobTable(limit,offset);
1149 }
else if (
string(type) ==
"Events" ) {
1150 queryEventTable(limit,offset);
1152 queryTable(type,limit,offset);
1155 nRows = mysql_num_rows(fResult) ;
1156 log->debug(
string(itoa(nRows)) +
" rows from " + itoa(offset) +
" row" );
1157 for (my_ulonglong i=0; i<nRows;++i)
1161 records.push_back(task);
1171 int TxUCMCollector::fillJobList(
StUcmJobs &jobs,
int limit,
int offset)
1174 return fillUcmList(
"Jobs",l,limit,offset);
1178 void TxUCMCollector::setBrokerTaskID(
const StRecord *task)
1180 const char *requestId = 0;
1183 setBrokerTaskID(requestId);
1188 void TxUCMCollector::setBrokerJobID(
const StRecord *job)
1191 int brokerJobID = job->
getField(
"brokerJobID")->toInt();
1192 setBrokerJobID(brokerJobID);
1193 int jobID = job->
getField(
"jobID")->toInt();
1198 int TxUCMCollector::fillJobList(
StUcmJobs &jobs,
int limit,
int offset,
const StRecord *task)
1202 setBrokerTaskID(task);
1203 row = fillUcmList(
"Jobs",l,limit,offset);
1212 int TxUCMCollector::fillEventList(
StUcmEvents &events,
int limit,
int offset,
const StRecord *job)
1215 if (job) setBrokerJobID (job);
1216 return fillUcmList(
"Events",l,limit,offset);
1220 void TxUCMCollector::fillFields(
FieldList &fields)
1222 fRow = mysql_fetch_row(fResult);
1224 fField = mysql_fetch_fields(fResult);
1225 unsigned int n_fields = mysql_num_fields(fResult);
1226 log->debug(
string(
"Fetching ") + itoa(n_fields) +
" fields ");
1227 for (
unsigned int i=0;i<n_fields; ++i)
1229 fields.push_back(createField(i));
1232 log->error(mysql_error(connection));
1240 case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
1241 ucmType =StDbFieldI::kINT;
1243 case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
1244 case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
1245 ucmType = StDbFieldI::kUNIXTIME;
1248 case ucmType =StDbFieldI::kLONG;
break;
1249 case ucmType =StDbFieldI::kULONG;
break;
1250 case ucmType =StDbFieldI::kDOUBLE;
break;
1252 case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VAR_STRING:
1253 ucmType =StDbFieldI::kCHAR;
break;
1261 StDbFieldI *TxUCMCollector::createField(
unsigned int fieldIndx)
1263 StDbFieldI *field =
new StDbFieldI(fField[fieldIndx].org_name, fRow[fieldIndx],MapSqlField(fField[fieldIndx].type),1);
1272 void TxUCMCollector::printVersion(){
1273 log->info(
"Tx UCM Collector, version 0.4");
1282 boolean debug =
false;
1283 Option h =
new Option (
"h",
"help",
false,
"print this message");
1284 Option v =
new Option (
"v",
"version",
false,
"print the version information");
1285 Option d =
new Option (
"d",
"debug",
false,
"set log level to debug");
1286 Option t =
new Option (
"t",
"test",
false,
"run the setUpTest()");
1287 Option m =
new Option (
"m",
"message",
true,
"parse one message and exit");
1290 Options options =
new Options ();
1291 options.addOption (h);
1292 options.addOption (v);
1293 options.addOption (d);
1294 options.addOption (t);
1295 options.addOption (m);
1297 CommandLineParser parser =
new PosixParser();
1299 const char * jobMessage=null;
1301 cmd = parser.parse (options, args);
1303 CATCH (ParseException pe) {
1304 System.out.println (
"** Error ** : Check your input parametres:");
1309 if (cmd.hasOption (
"m")) {
1311 jobMessage = cmd.getOptionValue(
"m");
1312 if (jobMessage.isEmpty () ) {
1313 System.out.println (
"Error: no message has been provided");
1317 const char * jobMessagesArgs[] = cmd.getArgs() ;
1319 for (n=0; n < jobMessagesArgs.length; n++)
1320 jobMessage +=
" " + jobMessagesArgs[n] ;
1322 if (cmd.hasOption (
"h")) {
1327 if (cmd.hasOption (
"v")) {
1332 if (cmd.hasOption (
"d") ){
1336 System.out.println(
"option:debug");
1342 if (collector.
initDb() ) {
1343 if (jobMessage == null ) {
1351 System.out.println (
"created collector object");
1356 const char * TxUCMCollector::fgTaskCols =
"('taskID', 'brokerTaskID', 'brokerID', "
1357 "'requesterID', 'taskName', 'taskDescription', 'taskSize', "
1358 "'taskRemainSize', 'submitTime', 'updateTime', 'archiveFlag')";
1361 const char *TxUCMCollector::fgJobsTableCols =
1363 "jobID int(11) NOT NULL AUTO_INCREMENT KEY COMMENT 'ID of job when entry is created, unique within table', "
1364 "updateTime timestamp NOT NULL default CURRENT_TIMESTAMP COMMENT 'Time that job execution state was last updated', "
1365 "brokerJobID int(11) NOT NULL COMMENT 'ID of job as assigned by Broker', "
1366 "taskID int(11) NOT NULL COMMENT 'Foreign key reference to Tasks table', "
1367 "gridJobID varchar(64) default NULL COMMENT 'ID for job as assigned by Grid Resource Allocation Manager (GRAM)', "
1368 "localJobID int(11) default NULL COMMENT 'ID for job as assigned by local resource manager or scheduler', "
1369 "gridSubmitTime datetime default NULL COMMENT 'Time that job was submitted to the GRAM', "
1370 "localSubmitTime datetime default NULL COMMENT 'Time that job was submitted to the local resource manager or scheduler', "
1371 "siteLocation varchar(64) default NULL COMMENT 'Physical local of job, could be grid site or local cluster description', "
1372 "queue varchar(64) default NULL COMMENT 'Name and short description of queue that shedules job', "
1373 "queuePosition int(11) default NULL COMMENT 'Integer slot position of job in local resource manager or scheduler', "
1374 "nodeLocation varchar(64) default NULL COMMENT 'Name of worker node that job lands on', "
1375 "startTime datetime default NULL COMMENT 'Time that job started execution', "
1376 "executionUserName varchar(32) default NULL COMMENT 'A login ID on the local resource site & worker node that actually executes', "
1377 "stateID int(11) NOT NULL default '1' COMMENT 'Foreign key reference to StateDictionary table', "
1378 "CONSTRAINT UNIQUE INDEX jobID (brokerJobID, taskID)"
1382 const char * TxUCMCollector::fgJobCols =
"('jobID', 'brokerJobID', 'taskID', "
1383 "'gridJobID', 'localJobID', 'gridSubmitTime', "
1384 "'localSubmitTime', 'siteLocation', 'queue', 'queuePosition', "
1385 "'nodeLocation', 'startTime', 'executionUserName', 'stateID')";
1389 const char * TxUCMCollector::fgEventsTableCols =
1391 "eventID int(11) NOT NULL AUTO_INCREMENT KEY COMMENT 'ID of event when entry is created, unique within table', "
1392 "time timestamp NOT NULL default CURRENT_TIMESTAMP COMMENT 'Time that event was recorded by the Tracking Library', "
1393 "jobID int(11) NOT NULL COMMENT 'Job that this message is associated with', "
1394 "levelID int(11) NOT NULL COMMENT 'The ID of the log level of the event (WARNING, DEBUG, ERROR, etc.)', "
1395 "context VARCHAR(40) NOT NULL COMMENT 'The bulk category of the log event or the facilty or code where the event happens', "
1396 "stageID int(11) NOT NULL COMMENT 'The ID of the logging stage of the event (i.e., START, STATUS, or END)', "
1397 "messageKey VARCHAR(40) COMMENT 'A user defined property key or SYSTEM for system event', "
1398 "messageValue VARCHAR(120) COMMENT 'A user defined property value or textual content of a log message for a system event', "
1399 "cpuLoad double default NULL COMMENT 'Optional benchmarking value, program CPU load in %', "
1400 "totalMem int(11) default NULL COMMENT 'Optional benchmarking value, total system memory in KiB', "
1401 "usedMem int(11) default NULL COMMENT 'Optional benchmarking value, total used memory in KiB', "
1402 "appMem int(11) default NULL COMMENT 'Optional benchmarking value, total app memory in KiB', "
1407 const char * TxUCMCollector::fgEventCols =
"('eventID', 'jobID', 'levelID', "
1408 "'context', 'time', 'stageID', 'messageKey', 'messageValue', "
1409 "'cpuLoad', 'totalMem', 'usedMem', 'appMem')";
1411 int TxUCMCollector::queryTableSize(
const char *tableName,
const StRecord *where)
1414 if(
string(tableName) ==
"Jobs") {
1415 setBrokerTaskID(where);
1416 }
else if (
string(tableName) ==
"Events") {
1417 setBrokerJobID(where);
1419 size = queryTableSize(tableName);
1423 int TxUCMCollector::queryTableSize(
const char *tableName,
const char *where)
1427 bool countSelectedEvents =
false;
1429 if ((
string(tableName) ==
"Tasks" ) && (msgHashMap.find(fgRequester) != msgHashMap.end())) {
1430 whichTask=string(
"requesterID='")+ msgHashMap[fgRequester]+
"' ";
1431 where = whichTask.c_str();
1432 }
else if (
string(tableName) ==
"Jobs" ) {
1433 tableName = jobTableName().c_str();
1434 }
else if (
string(tableName) ==
"Events" ) {
1435 tableName = eventTableName().c_str();
1436 countSelectedEvents = (fDbJobID >= 0 );
1439 string query = string(
"select count(*) from `")+ tableName +
"`";
1440 if (where &&where[0]) {
1441 query += string(
" WHERE ") + where;
1442 }
else if (countSelectedEvents) {
1443 query += string(
" WHERE ") + string(
"jobID='")+ itoa(fDbJobID)+
"' ";
1447 if (mysql_num_rows(fResult)==1) {
1448 fRow = mysql_fetch_row(fResult);
1450 log->error(mysql_error(connection));
1455 log->error(
string(
"wrong result for <") + query +
">");
1461 void TxUCMCollector::queryTable(
const char *tableName,
int limit,
int offset,
const char *where)
1464 string query = string(
"select * from `") + tableName +
"` ";
1465 if (where && where[0] ) {
1466 query += string(
"WHERE ") + where;
1468 if (limit > 0) query +=
" LIMIT " + itoa(limit);
1470 if(limit <= 0 ) query +=
"LIMIT 9999999 ";
1471 query +=
" OFFSET " + itoa(offset);
1477 void TxUCMCollector::queryTaskTable(
int limit,
int offset)
1479 string where=string(
" requesterID='")+ msgHashMap[fgRequester]+
"' ";
1480 where += string(
" ORDER BY ") +
"taskID" +
" DESC ";
1482 queryTable(
"Tasks",limit,offset,where.c_str());
1485 void TxUCMCollector::queryJobTable(
int limit,
int offset)
1487 queryTable(jobTableName().c_str(),limit,offset);
1489 string where =
" taskID=47948 ";
1490 queryTable(
"Jobs",limit,offset, where.c_str());
1495 void TxUCMCollector::queryJobTable(
const char *taskID,
int limit,
int offset)
1497 queryTable(jobTableName().c_str(),limit,offset);
1499 string where = string(
" taskID='") + taskID +
"' ";
1500 queryTable(
"Jobs",limit,offset, where.c_str());
1504 void TxUCMCollector::queryEventTable(
int limit,
int offset)
1506 string where=string(
"jobID='")+ itoa(fDbJobID)+
"' ";
1507 queryTable(eventTableName().c_str(),limit,offset,where.c_str());
1508 if (!fResult) queryTable(
"Messages",limit,offset,where.c_str());
1512 void TxUCMCollector::queryEventTable(
const char *jobDbID,
int limit,
int offset)
1514 string where=string(
" jobID='")+ jobDbID+
"' ";
1515 queryTable(eventTableName().c_str(),limit,offset,where.c_str());
1516 if (!fResult) queryTable(
"Messages",limit,offset,where.c_str());
1522 void TxUCMCollector::writeDown(
const std::string& message)
1537 void TxUCMCollector::setBrokerTaskID (
const std::string& brokerTaskID)
1539 msgHashMap[fgBTaskID] = brokerTaskID;
1543 void TxUCMCollector::setBrokerJobID (
int brokerJobID)
1545 fBrokerJobID = brokerJobID;
1557 msgHashMap[fgRequester] = requester;
1583 void TxUCMCollector::setJobSubmitLocation (
const std::string& url)
1604 void TxUCMCollector::setJobSubmitState (State state)
1612 void TxUCMCollector::setJobSubmitID (
const std::string&
ID)
1617 Level level,Stage stage,
const std::string& msgContext)
1622 const std::string& userValue, Level level, Stage stage,
1623 const std::string& msgContext)
virtual void setContext(const std::string &context)
static void main(const char *args[])
void processMessage(const std::string &message)
virtual void logEvent(const std::string &logMsg, Level level=LEVEL_INFO, Stage stage=STATUS, const std::string &msgContext=TxUCMConstants::defaultContext)
StDbFieldI * getField(const char *name) const
virtual std::string getDescription() const
virtual void logTask(unsigned int size=1)
virtual void setEnvBrokerJobID(const std::string &envBrokerJobID)
virtual void setEnvBrokerTaskID(const std::string &envBrokerTaskID)
virtual void logEnd(const std::string &key, const std::string &value)
virtual void logJobSubmitState(State state)
const char * getValueAsString() const
virtual void logJobSubmitID(const std::string &ID)
virtual void setRequesterName(const std::string &requester)
virtual void logStart(const std::string &key, const std::string &value)
TxLogging::FieldList & getFields()
virtual void setDbJobID(int dbJobID)
virtual void logJobAttribute(const std::string &key, const std::string &value)
virtual void logJobSubmitLocation(const std::string &url)
virtual StUcmTasks * getTaskList()