20 #include "MySQLAppender.h"
27 #include <log4cxx/helpers/loglog.h>
28 #include <log4cxx/helpers/optionconverter.h>
29 #include <log4cxx/patternlayout.h>
31 using namespace log4cxx;
32 using namespace log4cxx::helpers;
33 using namespace log4cxx::db;
34 using namespace log4cxx::spi;
40 : connection(0), bufferSize(5),fLastId(0),fIsConnectionOpen(false)
46 MySQLAppender::~MySQLAppender()
53 void MySQLAppender::setOption(
const String& option,
56 if (equalsIgnoreCase(option, _T(
"buffersize")))
58 setBufferSize((
size_t)OptionConverter::toInt(value, 1));
60 else if (equalsIgnoreCase(option, _T(
"password")))
64 else if (equalsIgnoreCase(option, _T(
"sql")))
68 else if (equalsIgnoreCase(option, _T(
"url"))
69 || equalsIgnoreCase(option, _T(
"dns")))
73 else if (equalsIgnoreCase(option, _T(
"user")))
79 AppenderSkeleton::setOption(name, value);
84 void MySQLAppender::append(
const spi::LoggingEventPtr& event)
86 buffer.push_back(event);
88 if (buffer.size() >= bufferSize)
94 String MySQLAppender::getLogStatement(
const spi::LoggingEventPtr& event)
96 #if (STAR_LOG4CXX_VERSION == 9)
102 ((
MySQLAppender*)
this)->getLayout()->format(sbuf, event,pool);
108 unsigned int MySQLAppender::execute(
const String& sql)
111 if (getConnection()) {
118 if (( ret = mysql_query(connection,query.c_str()) )) {
119 fprintf(stderr,
"MYSQL QUERY: %s \n",mysql_error(connection));
134 void MySQLAppender::closeConnection()
136 if (fIsConnectionOpen) {
138 mysql_close(connection);
139 if (mysql_errno(connection)) fprintf(stderr,
"MYSQL close ERROR %s \n",mysql_error(connection));
141 fIsConnectionOpen =
false;
146 MYSQL *MySQLAppender::getConnection()
148 if (!fIsConnectionOpen) {
150 if ( !(connection= mysql_init(connection)) ) {
151 fprintf(stderr,
"MYSQL: ---- > No init connection \n");
154 const char *host =
"heston.star.bnl.gov";
155 const char *user =
"StarLogger";
156 const char *passwd =
"logger";
157 const char *db =
"logger";
158 unsigned int port = 3306;
160 if (!(mysql_real_connect(connection
169 fprintf(stderr,
"MYSQL: ---- > No connection: %s \n",mysql_error(connection));
171 fIsConnectionOpen =
false;
173 fIsConnectionOpen =
true;
181 void MySQLAppender::close()
188 static void ReplaceVariable(TString &
string,
const char *var)
192 const char *varValue = gSystem->Getenv(var);
196 if (spec ==
"REQUESTID") {
197 spec.Form(
"%d",gSystem->GetPid());
198 varValue= spec.Data();
199 }
else if (spec ==
"JOBINDEX") {
201 varValue= spec.Data();
206 TString fullName =
"$"; fullName += var;
208 string.ReplaceAll(fullName,varValue);
212 void MySQLAppender::flushBuffer()
216 static bool TaskEntryDone =
false;
217 std::list<spi::LoggingEventPtr>::iterator i;
218 if ( getConnection()) {
219 for (i = buffer.begin(); i != buffer.end(); i++)
221 TString expandCommand;
223 if (!TaskEntryDone) {
229 "INSERT DELAYED IGNORE TaskDescription (TaskDescriptionID, TaskRequestID_MD5, TaskSize, TaskRemainSize, EntryTime, UpdateTime, TaskUser,TaskDescription,TaskCredential,BrokerID)"
230 " VALUES ( DEFAULT, \"$REQUESTID\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\",\"$SUMS_AUTHENTICATED_USER\",\"SUMS\");";
232 "INSERT DELAYED IGNORE Tasks (taskID, brokerTaskID, taskName, taskSize, taskRemainSize, submitTime, updateTime, requesterID,taskDescription)"
233 " VALUES ( DEFAULT, \"$REQUESTID\", \"Short name of task\", \"$SUMS_nProcesses\",\"$SUMS_nProcesses\",\"$SUBMIT_TIME\",DEFAULT,\"$SUMS_USER\",\"$SUMS_name\");";
242 ReplaceVariable(expandCommand,
"REQUESTID");
243 ReplaceVariable(expandCommand,
"SUMS_nProcesses");
244 ReplaceVariable(expandCommand,
"SUBMIT_TIME");
246 ReplaceVariable(expandCommand,
"SUMS_name");
247 ReplaceVariable(expandCommand,
"SUMS_USER");
248 ReplaceVariable(expandCommand,
"SUMS_AUTHENTICATED_USER");
249 sql = expandCommand.Data();
250 if (!execute(sql)) TaskEntryDone =
true;
259 expandCommand =
"INSERT DELAYED IGNORE INTO JobDescription SET ";
261 expandCommand +=
"TaskDescriptionID = (SELECT TaskDescriptionID FROM TaskDescription WHERE TaskRequestID_MD5=\"$REQUESTID\")";
262 expandCommand +=
", ";
263 expandCommand +=
"TaskRequestID_MD5=\"$REQUESTID\"";
264 expandCommand +=
", ";
265 expandCommand +=
"BrokerProcessID=\"$JOBINDEX\"";
266 expandCommand +=
", ";
267 expandCommand +=
"JobLocationURL=\"$HOSTNAME\"";
268 expandCommand +=
", ";
269 expandCommand +=
"JobUser=\"$USER\"";
270 expandCommand +=
"; ";
272 expandCommand =
"INSERT DELAYED IGNORE INTO Jobs SET ";
274 expandCommand +=
"taskID = (SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\")";
275 expandCommand +=
", ";
276 expandCommand +=
"brokerJobID=\"$JOBINDEX\"";
277 expandCommand +=
", ";
278 expandCommand +=
"startTime=NOW()";
279 expandCommand +=
", ";
280 expandCommand +=
"nodeLocation=\"$HOSTNAME\"";
281 expandCommand +=
", ";
282 expandCommand +=
"stateID=\"4\"";
283 expandCommand +=
", ";
284 expandCommand +=
"executionUserName=\"$USER\"";
285 expandCommand +=
"; ";
294 ReplaceVariable(expandCommand,
"USER");
295 ReplaceVariable(expandCommand,
"HOSTNAME");
296 ReplaceVariable(expandCommand,
"REQUESTID");
297 ReplaceVariable(expandCommand,
"JOBINDEX");
298 sql = expandCommand.Data();
299 if (!execute(sql) ) {
303 const LoggingEventPtr& logEvent = *i;
304 String sql = getLogStatement(logEvent);
305 expandCommand = sql.c_str();
307 ReplaceVariable(expandCommand,
"REQUESTID");
308 ReplaceVariable(expandCommand,
"JOBINDEX");
310 sql = expandCommand.Data();
312 #ifdef NEWTABLE_EXPANSION
313 expandCommand =
"UPDATE LOW_PRIORITY IGNORE Jobs SET updateTime=NOW() WHERE brokerJobID=\"$JOBINDEX\" AND taskID=(SELECT taskID FROM Tasks WHERE brokerTaskID=\"$REQUESTID\");";
314 ReplaceVariable(expandCommand,
"REQUESTID");
315 ReplaceVariable(expandCommand,
"JOBINDEX");
317 fprintf(stderr,
" MYSQL ----> can not update the Jobs record%s \n", expandCommand.c_str());
322 fprintf(stderr,
" MYSQL ----> skip and lose event \n");
333 void MySQLAppender::setSql(
const String& s)
336 if (getLayout() == 0)
338 this->setLayout(
new PatternLayout(s));
342 PatternLayoutPtr patternLayout = this->getLayout();
343 if (patternLayout != 0)
345 patternLayout->setConversionPattern(s);
349 #if (STAR_LOG4CXX_VERSION == 10)
351 void MySQLAppender::append(
const spi::LoggingEventPtr& event, log4cxx::helpers::Pool& p)