273 #include "StDbManager.hh"
274 #include "stdb_streams.h"
275 #include "StDbDefaults.hh"
276 #include "StDbManagerImpl.hh"
279 #include <sys/types.h>
286 #ifndef __STDB_STANDALONE__
287 #include "StMessMgr.h"
289 #define LOG_DEBUG cout
290 #define LOG_INFO cout
291 #define LOG_WARN cout
292 #define LOG_ERROR cerr
293 #define LOG_FATAL cerr
301 #define freeze(i) str()
305 #define CR_MIN_ERROR 2000
306 #define CR_MAX_ERROR 2999
307 #define CR_UNKNOWN_ERROR 2000
308 #define CR_SOCKET_CREATE_ERROR 2001
309 #define CR_CONNECTION_ERROR 2002
310 #define CR_CONN_HOST_ERROR 2003
311 #define CR_IPSOCK_ERROR 2004
312 #define CR_UNKNOWN_HOST 2005
313 #define CR_SERVER_GONE_ERROR 2006
314 #define CR_VERSION_ERROR 2007
315 #define CR_OUT_OF_MEMORY 2008
316 #define CR_WRONG_HOST_INFO 2009
317 #define CR_LOCALHOST_CONNECTION 2010
318 #define CR_TCP_CONNECTION 2011
319 #define CR_SERVER_HANDSHAKE_ERR 2012
320 #define CR_SERVER_LOST 2013
321 #define CR_COMMANDS_OUT_OF_SYNC 2014
322 #define CR_NAMEDPIPE_CONNECTION 2015
323 #define CR_NAMEDPIPEWAIT_ERROR 2016
324 #define CR_NAMEDPIPEOPEN_ERROR 2017
325 #define CR_NAMEDPIPESETSTATE_ERROR 2018
327 #define __CLASS__ "MysqlDb"
331 time_t get_time_nanosec() {
332 #ifdef HAVE_CLOCK_GETTIME
334 clock_gettime(CLOCK_MONOTONIC, &ts);
335 return (ts.tv_sec*1000 + ts.tv_nsec/1000000);
343 static const char* binaryMessage = {
"Cannot Print Query with Binary data"};
350 MysqlDb::MysqlDb(): mdbhost(0), mdbName(NULL), mdbuser(0), mdbpw(0), mdbPort(0),mdbServerVersion(0),mlogTime(false) {
352 if (mdbuser == NULL && getenv(
"USE_LB_LOGIN") != NULL) {
353 mdbuser = (
char*)
"loadbalancer";
354 mdbpw = (
char*)
"lbdb";
358 mhasBinaryQuery=
false;
363 for(
int i=0;i<200;i++)cnames[i]=0;
365 mSysusername =
"N/A";
366 struct passwd *pwd = 0;
367 pwd = getpwuid(geteuid());
369 mSysusername = pwd->pw_name;
370 mdbuser = (
char*)mSysusername.c_str();
371 std::cout <<
"DB OVERRIDE default user with: " << mdbuser << std::endl;
373 std::cout <<
"DB OVERRIDE failure, user ID cannot be retrieved" << std::endl;
379 if(mQuery)
delete [] mQuery;
380 if(mQueryLast)
delete [] mQueryLast;
383 if(mhasConnected)mysql_close(&mData);
391 #ifdef MYSQL_VERSION_ID
392 # if MYSQL_VERSION_ID > 50044
399 bool MysqlDb::reConnect(){
400 #define __METHOD__ "reConnect()"
402 bool connected=
false;
403 unsigned int timeOutConnect=mtimeout;
404 my_bool auto_reconnect = 1;
406 while(!connected && timeOutConnect<600){
407 mysql_options(&mData,MYSQL_OPT_CONNECT_TIMEOUT,(
const char*)&timeOutConnect);
409 #ifdef MYSQL_VERSION_ID
410 # if MYSQL_VERSION_ID > 50044
411 mysql_options(&mData,MYSQL_OPT_RECONNECT, &auto_reconnect);
420 mysql_ssl_set(&mData, NULL, NULL, NULL, NULL,
"AES128-SHA");
422 unsigned long client_flag = CLIENT_COMPRESS;
425 if(mysql_real_connect(&mData,mdbhost,mdbuser,mdbpw,mdbName,mdbPort,NULL,client_flag)) {
427 std::string query =
"SHOW STATUS LIKE 'Ssl_cipher'";
428 mysql_query(&mData, query.c_str());
429 MYSQL_RES *result = 0;
432 result = mysql_store_result(&mData);
433 num_fields = mysql_num_fields(result);
434 if (num_fields >= 2) {
435 row = mysql_fetch_row(result);
436 if (row && row[0] && row[1]) {
437 LOG_INFO << row[0] <<
" = " << row[1] << endm;
440 mysql_free_result(result);
446 wm <<
" Cannot connect to " << mdbhost <<
":" << mdbPort <<
", database server is busy or unreachable.\n";
447 wm <<
" Returned error =";
448 wm << mysql_error(&mData)<<
".\n Will re-try with timeout set at \n==> ";
449 wm << timeOutConnect<<
" seconds <==";
455 if(mdbServerVersion)
delete [] mdbServerVersion;
457 if(!mData.server_version){
459 smm<<
" No Server version - most likely incompatible libraries \n CONTACT DATABASE ADMINISTRATOR";
461 assert(mData.server_version);
465 mdbServerVersion=
new char[strlen(mData.server_version)+1];
466 strcpy(mdbServerVersion,mData.server_version);
475 bool MysqlDb::Connect(
const char *aHost,
const char *aUser,
const char *aPasswd,
const char *aDb,
const int aPort){
476 #define __METHOD__ "Connect(host,user,pw,database,port)"
481 mdbuser =
new char[strlen(aUser)+1]; strcpy(mdbuser,aUser);
484 if(mdbpw)
delete [] mdbpw;
485 mdbpw =
new char[strlen(aPasswd)+1]; strcpy(mdbpw,aPasswd);
491 #ifndef NoXmlTreeReader
492 if (!my_manager->myServiceBroker)
496 if(mdbhost)
delete [] mdbhost;
497 mdbhost =
new char[strlen(aHost)+1];
498 strcpy(mdbhost,aHost);
506 if(aDb && (strcmp(aDb,
" ")!=0)){
507 mdbName =
new char[strlen(aDb)+1];
511 bool tRetVal =
false;
512 double t0=mqueryLog.wallTime();
513 if(mlogTime)mconnectLog.start();
514 if (!mysql_init(&mData))
515 return (
bool)
StDbManager::Instance()->printInfo(
"Mysql Init Error=",mysql_error(&mData),dbMErr,__LINE__,__CLASS__,__METHOD__);
521 t0=mqueryLog.wallTime()-t0;
522 cs<<
"Server Connecting:";
if(mdbName)cs<<
" DB=" << mdbName ;
523 cs<<
" Host=" << mdbhost <<
":"<<mdbPort <<stendl;
524 cs<<
" --> Connection Time="<<t0<<
" sec ";
525 if(mdbServerVersion)cs<<
" MysqlVersion="<<mdbServerVersion;
530 cs <<
"Making Connection to DataBase = " << aDb;
531 cs <<
" On Host = " << mdbhost <<
":"<<mdbPort;
532 cs <<
" MySQL returned error " << mysql_error(&mData);
536 if(mlogTime)mconnectLog.end();
537 mhasConnected = tRetVal;
544 bool MysqlDb::loadBalance()
548 #ifndef NoXmlTreeReader
549 time_t startTime = get_time_nanosec();
550 time_t stopTime = 0, totalTime = 0;
552 if (my_manager->myServiceBroker)
554 my_manager->myServiceBroker->DoLoadBalancing();
555 short mSBStatus = my_manager->myServiceBroker->GetStatus();
556 if (mSBStatus==st_db_service_broker::NO_ERROR)
559 const char* lbHostName = (my_manager->myServiceBroker->GiveHostName()).c_str();
560 if(mdbhost)
delete [] mdbhost;
561 mdbhost =
new char[strlen(lbHostName)+1];
562 strcpy(mdbhost,lbHostName);
564 mdbPort = my_manager->myServiceBroker->GiveHostPort();
568 LOG_ERROR <<
"MysqlDb::Connect: StDbServiceBroker error "<<mSBStatus<<endm;
572 stopTime = get_time_nanosec();
573 totalTime = stopTime - startTime;
574 LOG_INFO <<
"MysqlDb::Connect: Load balancer took "<< totalTime <<
" ms, will use "<< mdbhost <<
":" << mdbPort << endm;
584 char* MysqlDb::printQuery(){
return mQueryLast; };
587 void MysqlDb::RazQuery() {
590 delete [] mQueryLast;
594 mQueryLast =
new char[strlen(binaryMessage)+1];
595 strcpy(mQueryLast,binaryMessage);
596 if(mQuery)
delete [] mQuery;
599 mQueryLast=
new char[strlen(mQuery)+1];
600 strcpy(mQueryLast,mQuery);
609 mhasBinaryQuery=
false;
612 bool MysqlDb::checkForTable(
const char* tableName){
615 mRes->mRes=mysql_list_tables(&mData,tableName);
616 if(mRes->mRes==NULL)
return false;
624 bool MysqlDb::ExecQuery(){
625 #define __METHOD__ "ExecQuery()"
628 std::string mQ(mQuery,mQueryLen);
629 mQ.append(
" /* RUSR: ");
630 mQ.append(mSysusername);
631 mQ.append(
" | SUSR: ");
641 size_t cache_length = 0;
642 if (m_Mgr.isActive()) {
643 std::string dbName = mdbName ? mdbName :
"";
644 const char* res = m_Mgr.get(dbName, mQuery, cache_length);
645 if (res && cache_length) {
646 return mqueryState =
true;
650 if(mlogTime)mqueryLog.start();
654 int status=mysql_real_query(&mData,mQ.c_str(), mQ.size());
656 if( (status!=0) && ( mysql_errno(&mData)==CR_SERVER_GONE_ERROR || mysql_errno(&mData)==CR_SERVER_LOST ) ){
657 StDbManager::Instance()->printInfo(mysql_error(&mData),
" Lost server, will try to reconnect",dbMDebug,__LINE__,__CLASS__,__METHOD__);
658 if(reConnect())status=mysql_real_query(&mData,mQuery,mQueryLen);
662 return StDbManager::Instance()->printInfo(
" Query Failed ",mysql_error(&mData),dbMErr,__LINE__,__CLASS__,__METHOD__);
664 if(mlogTime)mqueryLog.end();
667 if(mlogTime)msocketLog.start();
668 mRes->mRes=mysql_store_result(&mData);
669 if(mlogTime)msocketLog.end();
671 return mqueryState=
true;
676 MysqlDb &MysqlDb::operator<<(
const char *aQuery){
678 if (strcmp(aQuery,
";")==0){
684 mQueryLen=strlen(aQuery);
685 mQuery =
new char[mQueryLen+1];
686 strcpy(mQuery,aQuery);
688 char* tQuery =
new char[strlen(mQuery)+1];
689 strcpy(tQuery,mQuery);
691 mQuery =
new char[mQueryLen+strlen(aQuery)+1];
692 memcpy(mQuery,tQuery,mQueryLen);
693 strcpy(&mQuery[mQueryLen],aQuery);
695 mQueryLen=mQueryLen+strlen(aQuery);
707 mhasBinaryQuery=
true;
709 char *tQuery =
new char[mQueryLen+aBin->mLen+1];
710 memcpy(tQuery,mQuery,mQueryLen);
711 memcpy(&tQuery[mQueryLen],aBin->mBinData,aBin->mLen+1);
713 if(mQuery)
delete [] mQuery;
715 mQueryLen=mQueryLen+aBin->mLen;
722 bool MysqlDb::InputStart(
const char* table,
StDbBuffer *aBuff,
const char* colList,
int nRows,
bool& hasBinary){
725 if(!table || !aBuff || !colList)
return tRetVal;
727 bool change=aBuff->IsClientMode();
728 if(change) aBuff->SetStorageMode();
730 *
this <<
"select * from " << table <<
" where null"<< endsql;
731 *
this <<
"insert into " << table <<
" ("<<colList<<
") VALUES(";
734 char* tmpString=
new char[strlen(colList)+1];
735 strcpy(tmpString,colList);
742 if((ptr2=strstr(ptr1,
","))){
747 if(*ptr1==
' ')ptr1++;
748 if(cnames[jfields])
delete [] cnames[jfields];
749 cnames[jfields]=
new char[strlen(ptr1)+1];
750 strcpy(cnames[jfields],ptr1);
758 int nfields=NbFields();
762 for(
int k=0;k<jfields;k++){
763 for(i=0;i<nfields;i++)
764 if(strcmp(mRes->mRes->fields[i].name,cnames[k])==0)
break;
766 if(i==nfields)
continue;
768 isBlob[k]=( (IS_BLOB(mRes->mRes->fields[i].flags)) ||
769 (mRes->mRes->fields[i].type ==254) );
770 isBinary[k]= (mRes->mRes->fields[i].flags&BINARY_FLAG);
771 isSpecialType[k]=(mRes->mRes->fields[i].type ==254);
773 if(isBinary[k])hasBinary=
true;
775 if(fcount!=jfields) done=
false;
781 bool MysqlDb::InputRow(
StDbBuffer* aBuff,
int row){
785 aBuff->SetStorageMode();
786 if(row>0)*
this<<
"),(";
788 for(k=0;k<jfields;k++){
792 if(!aBuff->ReadArray(tVal,len,cnames[k]))
break;
793 *
this<<
"'"<<Binary(len,(
float*)tVal)<<
"'";
794 }
else if(isSpecialType[k]) {
795 if(!aBuff->ReadScalar(tVal,cnames[k]))
break;
796 *
this<<
"'"<<tVal<<
"'";
799 if(!aBuff->ReadArray(tVal2,len,cnames[k]))
break;
800 tVal=CodeStrArray(tVal2,len);
801 for(
int jj=0;jj<len;jj++)
if(tVal2[jj])
delete []tVal2[jj];
802 *
this<<
"'"<<tVal<<
"'";
805 if(!aBuff->ReadScalar(tVal,cnames[k]))
break;
806 *
this<<
"'"<<tVal<<
"'";
810 aBuff->SetClientMode();
820 bool MysqlDb::InputEnd(){
824 if(mqueryState)tRetVal=
true;
830 bool MysqlDb::Input(
const char *table,
StDbBuffer *aBuff){
832 bool change=aBuff->IsClientMode();
833 if (change) aBuff->SetStorageMode();
834 aBuff->SetStorageMode();
836 *
this <<
"select * from " << table <<
" where null"<< endsql;
837 *
this <<
"insert into " << table <<
" set ";
843 for (i=0;i<NbFields();i++) {
844 if ((IS_BLOB(mRes->mRes->fields[i].flags) ) ||
845 mRes->mRes->fields[i].type ==254) {
846 if (mRes->mRes->fields[i].flags&BINARY_FLAG) {
847 if (aBuff->ReadArray(tVal,len,mRes->mRes->fields[i].name)){
853 *
this << mRes->mRes->fields[i].name <<
"='" << Binary(len,(
float*)tVal)<<
"'";
856 if(mRes->mRes->fields[i].type==254){
857 if (aBuff->ReadScalar(tVal,mRes->mRes->fields[i].name)) {
863 *
this << mRes->mRes->fields[i].name <<
"='" << tVal <<
"'";
867 if (aBuff->ReadArray(tVal2,len,mRes->mRes->fields[i].name)){
868 tVal=CodeStrArray(tVal2,len);
869 int j;
for (j=0;j<len;j++) {
if (tVal2[j])
delete [] tVal2[j];};
876 *
this << mRes->mRes->fields[i].name <<
"='" << tVal<<
"'";
881 if (aBuff->ReadScalar(tVal,mRes->mRes->fields[i].name)) {
887 *
this << mRes->mRes->fields[i].name <<
"='" << tVal <<
"'";
890 if (tVal)
delete [] tVal;tVal=0;
896 if(mqueryState)tRetVal=
true;
900 if (change) aBuff->SetClientMode();
901 aBuff->SetClientMode();
911 if (m_Mgr.isValueFound()) {
914 return m_Mgr.processOutput(aBuff);
915 }
else if (!m_Mgr.isValueFound() && mRes->mRes) {
916 if (!m_Mgr.getLastGroupKey().empty() && !m_Mgr.getLastKey().empty() && m_Mgr.getLastGroupKey() !=
" " && m_Mgr.getLastKey() !=
" ") {
919 aBuff->SetStorageMode();
920 m_Mgr.set(m_Mgr.getLastGroupKey().c_str(), m_Mgr.getLastKey().c_str(), mRes->mRes, 0);
921 mysql_data_seek(mRes->mRes, 0);
922 aBuff->SetClientMode();
927 if(mlogTime)msocketLog.start();
928 MYSQL_ROW tRow=mysql_fetch_row(mRes->mRes);
929 if(!tRow)
return false;
930 unsigned long * lengths=mysql_fetch_lengths(mRes->mRes);
931 unsigned tNbFields=NbFields();
932 if(mlogTime)msocketLog.end();
935 bool change=aBuff->IsClientMode();
936 if (change) aBuff->SetStorageMode();
937 aBuff->SetStorageMode();
939 for (i=0;i<(int)tNbFields;i++){
943 if (IS_BLOB(mRes->mRes->fields[i].flags)) {
944 if (mRes->mRes->fields[i].flags&BINARY_FLAG) {
945 aBuff->WriteArray((
char*)tRow[i],lengths[i],mRes->mRes->fields[i].name);
949 tStrPtr=DecodeStrArray((
char*)tRow[i],len);
950 aBuff->WriteArray(tStrPtr,len,mRes->mRes->fields[i].name);
951 for(
int k=0;k<len;k++)
delete [] tStrPtr[k];
961 cn<<mRes->mRes->fields[i].name<<
".text";
962 aBuff->WriteScalar((
char*)tRow[i],(cn.str()).c_str());
965 aBuff->WriteScalar((
char*)tRow[i],mRes->mRes->fields[i].name);
981 if (change) aBuff->SetClientMode();
982 aBuff->SetClientMode();
988 char** MysqlDb::DecodeStrArray(
char* strinput ,
int &aLen){
992 char** tmparr =
new char*[1];
996 tmparr[0] = (
char*)
"0";
1000 char* tPnt=strinput;
1003 while (tPnt&&aLen<1024) {
1004 tPnt=strpbrk( tPnt,
"\\,");
1013 char** strarr=
new char*[aLen];
1016 char *tBuff=
new char[strlen(strinput)+1];
1017 char *tBuffInd=tBuff;
1020 tPntNew=strpbrk( tPnt,
"\\,");
1021 if ((tPntNew==0)||*tPntNew==
',') {
1023 strcpy(tBuffInd,tPnt);
1026 strncpy(tBuffInd,tPnt,tPntNew-tPnt);
1027 *(tBuffInd+(tPntNew-tPnt))=
'\0';
1029 strarr[tCount]=
new char[strlen(tBuff)+1];
1030 strcpy(strarr[tCount],tBuff);
1035 strncpy(tBuffInd,tPnt,tPntNew-tPnt);
1036 tBuffInd=tBuffInd+(tPntNew-tPnt);
1038 if(*tPntNew==
'\\'||*tPntNew==
',') {
1052 char* MysqlDb::CodeStrArray(
char** strarr ,
int aLen){
1055 for (i=0;i<aLen;i++) {
1056 if (strarr[i]) tMaxLen=tMaxLen+strlen(strarr[i])*2;
1059 char* tTempVal=
new char[tMaxLen+1];
1061 char* tWrite=tTempVal;
1062 for (i=0;i<aLen;i++) {
1066 for (j=0;j<(int)strlen(strarr[i]);j++) {
1067 if (*tRead==
'\\'||*tRead==
',') {
1081 char *tRetVal=
new char[strlen(tTempVal)+1];
1082 strcpy(tRetVal,tTempVal);
1090 MysqlDb::setDefaultDb(
const char* dbName){
1092 if(!dbName || strlen(dbName)==0)
return false;
1093 if(mdbName)
delete [] mdbName;
1094 mdbName=
new char[strlen(dbName)+1];
1095 strcpy(mdbName,dbName);
1096 if(strcmp(dbName,
" ")==0)
return true;
1099 unsigned int mysqlError;
1100 std::string sdbName(dbName);
1102 found = sdbName.find(
"blacklist");
1104 if(mysql_select_db(&mData,dbName)){
1106 mysqlError = mysql_errno(&mData);
1107 if(mysqlError==CR_SERVER_GONE_ERROR || mysqlError==CR_SERVER_LOST){
1109 if(mysql_select_db(&mData,dbName)){
1110 if (found == std::string::npos) {
1111 LOG_ERROR<<
"Error selecting database=" << dbName << endm;
1118 if (found == std::string::npos) {
1119 LOG_ERROR<<
"Error selecting database=" << dbName << endm;
1132 const MysqlBin *Binary(
const unsigned long int aLen,
const float *aBin){
1134 static char *tString=0;
1135 unsigned long int tNewLen;
1138 if (tString)
delete [] tString;
1140 tString =
new char[2*aLen+1];
1141 tNewLen = mysql_escape_string(tString,(
char*) aBin,aLen);
1142 tBin->Input(tNewLen,tString);
static StDbManager * Instance()
strdup(..) is not ANSI