/* * MySQLBackend.cc * * Created on: Feb 26, 2016 * Author: nbarros */ #include "PgSQLBackend.hh" #include <RAT/GLG4StringUtil.hh> #include <sstream> #include <cassert> #include <algorithm> #include <string> extern "C" { #include <unistd.h> // for sleep } using std::string; const std::string RAT::PgSQLBackend::ratdb_header_tbl_base_ = "ratdb_header_v2"; namespace RAT { PgSQLBackend::PgSQLBackend(const std::string& conn_str) : pg_conn_(NULL), pg_res_(NULL), pg_notify_(0), server_url_(conn_str),conn_options_(""),num_reconn_(5), num_sec_wait_(5),bor_done_(false),bor_warn_given_(false), ratdb_tag_(""), ratdb_header_tbl_("ratdb_header_v2") { // -- Current options defaults for RAT // -- 30 second timeout //conn_options_ ="?connect_timeout=15"; } void PgSQLBackend::Connect() { // Check if the connection is already open. If yes, do nothing if (PQstatus(pg_conn_) == CONNECTION_OK) return; std::string full_conn_str = server_url_; if (conn_options_.length() != 0) full_conn_str += conn_options_; pg_conn_ = PQconnectdb(full_conn_str.c_str()); if (PQstatus(pg_conn_) != CONNECTION_OK) { std::ostringstream msg; msg << "Failed to establish RATDB connection due to PgSQL exception. \n [ " << PQerrorMessage(pg_conn_) << "]"; Log::Die(msg.str().c_str()); } } PgSQLBackend::~PgSQLBackend() { Disconnect(); } void PgSQLBackend::Disconnect() { debug << "PgSQLBackend::Disconnect : Disconnecting from the RATDB server" << newline; if (PQstatus(pg_conn_) == CONNECTION_OK ) PQfinish(pg_conn_); pg_conn_ = NULL; } void PgSQLBackend::CheckAndResetConn() { // First check that we are not yet past BeginOfRun if (bor_done_ && !bor_warn_given_) { warn << "PgSQLBackend::CheckAndResetConn : Attempting to access the database after BeginOfRun. This should not be happening." << newline; bor_warn_given_ = true; // Re-Establish the connection Connect(); } if (PQstatus(pg_conn_) != CONNECTION_OK) { warn << "PgSQLBackend::CheckAndResetConn : RATDB connection was lost. Resetting." << newline; PQreset(pg_conn_); // try it again, if it still fails. Try with a sleeping time if (PQstatus(pg_conn_) != CONNECTION_OK) { unsigned int lcounter = 0; do { lcounter++; warn << Form("PgSQLBackend::CheckAndResetConn : Attempting to reconnect to server after %u seconds... [%u/%u]",num_sec_wait_,lcounter,num_reconn_) << newline; Disconnect(); sleep(num_sec_wait_); Connect(); } while ((PQstatus(pg_conn_) != CONNECTION_OK) && (lcounter < num_reconn_)); // After all the retries do one last check if (PQstatus(pg_conn_) != CONNECTION_OK) { Log::Die(Form("RAT lost connection to the server was not able to restore it after %u attempts",num_reconn_)); } } } } void PgSQLBackend::InsertObject(DBTable &tbl) { CheckAndResetConn(); json::Value &obj = tbl.GetRawJSONTable(); info << "PgSQLBackend::InsertObject : Inserting table into PgSQL backend" << newline; std::string query = "INSERT INTO ratdb.ratdb_data(data) VALUES ('"; query += obj.toJSONString(); query += "'::jsonb)"; #ifdef RATDB_DEBUG info << "PgSQLBackend::InsertObject : Query is :" << newline; info << "[" << query << "]"<< newline; #endif /* * Should PQclear PGresult whenever it is no longer needed to avoid memory * leaks */ #ifdef RATDB_DEBUG info << "PgSQLBackend::InsertObject : Preclearing the resultset." << newline; #endif ClearResultset(); #ifdef RATDB_DEBUG info << "PgSQLBackend::InsertObject : Executing the query." << newline; #endif pg_res_ = PQexec(pg_conn_, query.c_str()); if (PQresultStatus(pg_res_) != PGRES_COMMAND_OK) { #ifdef RATDB_DEBUG warn << "PgSQLBackend::InsertObject : Query failed execution." << newline; warn << "PgSQLBackend::InsertObject : Status " << PQresultStatus(pg_res_) << newline; #endif ClearResultset(); throw RAT::DBBackendError(tbl.GetName(),tbl.GetIndex(),PQerrorMessage(pg_conn_),"PSQL"); } #ifdef RATDB_DEBUG info << "PgSQLBackend::InsertObject : Query executed. Clearing resultset." << newline; #endif ClearResultset(); info << "PgSQLBackend::InsertObject : Table uploaded successfully." << newline; #ifdef RATDB_DEBUG info << "PgSQLBackend::InsertObject : Resultset cleared." << newline; #endif // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); } json::Value PgSQLBackend::FetchObjFast(const std::string &tblname, const std::string &index,bool &found, size_t &size_bytes) { CheckAndResetConn(); json::Value result; #ifdef RATDB_DEBUG info << "PgSQLBackend::FetchObjectFast : Fetching object " << tblname << "[" << index << "]" << newline; #endif std::string query = dformat("SELECT h.key FROM ratdb.%s h WHERE h.type = \'%s\' AND h.index = \'%s\' ORDER BY h.version DESC LIMIT 1", ratdb_header_tbl_.c_str(),tblname.c_str(),index.c_str()); ExecuteSelectQuery(query.c_str()); // -- Now grab the corresponding data int key = atoi(PQgetvalue(pg_res_, 0, 0)); ClearResultset(); #ifdef RATDB_DEBUG debug << "PgSQLBackend::FetchObjectFast : Preloading object with key [" << key << "]" << newline; #endif /// /// The reason to break this into two queries is to increase the chance of cache hits on simulataneous /// jobs /// query = dformat("SELECT d.data from ratdb.ratdb_data d WHERE d.key = %d",key); ExecuteSelectQuery(query.c_str()); size_bytes = PQgetlength(pg_res_,0,0); json::Reader reader(PQgetvalue(pg_res_, 0, 0)); reader.getValue(result); ClearResultset(); found = true; ClearResultset(); // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); return result; } json::Value PgSQLBackend::FetchObject(const std::string &tblname, const std::string &index, const int &runNumber, const int &passNumber, bool &found, size_t &size_bytes) { // First find the ID of the object that matches the description. Use a single transaction for it // Always check that the connection is ok CheckAndResetConn(); json::Value result; #ifdef RATDB_DEBUG info << "PgSQLBackend::FetchObject : Fetching object " << tblname << "[" << index << "][run=" << runNumber << "][pass=" << passNumber << "] tag=[" << ratdb_tag_ <<"]" << newline; #endif /* * Should PQclear PGresult whenever it is no longer needed to avoid memory * leaks */ ClearResultset(); std::string data_tbl = "ratdb_data"; bool use_raw = false; // Now fetch all the object IDs that match our search if (ratdb_tag_.size()) { data_tbl = "ratdb_raw_data"; use_raw = true; } // If there is a tag set. Use the ratdb_raw, to avoid changes made afterwards // -- first query just fetches the key std::string query = dformat("SELECT h.key, h.pass, h.run_begin, h.run_end FROM ratdb.%s h WHERE h.type = \'%s\' AND h.index = \'%s\' AND h.run_begin <= %d AND h.run_end >= %d", ratdb_header_tbl_.c_str(),tblname.c_str(),index.c_str(),runNumber,runNumber); if (passNumber == -1) { // We want to fetch the highest pass number available query += " ORDER BY h.version DESC, h.pass DESC LIMIT 1"; } else { query += dformat(" AND h.pass = %d ORDER BY h.version DESC LIMIT 1",passNumber); } ExecuteSelectQuery(query.c_str()); int n_res = PQntuples(pg_res_); #ifdef RATDB_DEBUG info << "PgSQLBackend::FetchObject : Found " << n_res << " objects." << newline; #endif int key, pass, run_begin, run_end; // Check the number of results. It should only give 1. if (n_res == 0) { found = false; ClearResultset(); } else if (n_res > 1) { // multiple results are a problem. Should kill right away ClearResultset(); Disconnect(); found = false; Log::Die(dformat("PgSQLBackend::FetchObject: Multiple objects on server matching query for object %s[%s][run=%d]. This should not happen. Aborting!",tblname.c_str(),index.c_str(),runNumber)); } else { key = atoi(PQgetvalue(pg_res_, 0, 0)); pass = atoi(PQgetvalue(pg_res_, 0, 1)); run_begin = atoi(PQgetvalue(pg_res_, 0, 2)); run_end = atoi(PQgetvalue(pg_res_, 0, 3)); ClearResultset(); query = dformat("SELECT d.data from ratdb.%s d WHERE d.key=%d",data_tbl.c_str(),key); ExecuteSelectQuery(query.c_str()); // if (PQresultStatus(pg_res_) != PGRES_TUPLES_OK) { // FailQuery(query.c_str()); // } size_bytes = PQgetlength(pg_res_,0,0); json::Reader reader(PQgetvalue(pg_res_, 0, 0)); reader.getValue(result); // Grab the indexed data, if necessary if (use_raw) { result.setMember("pass",json::Value(pass)); std::vector<int> range; range.push_back(run_begin); range.push_back(run_end); result.setMember("run_range",json::Value(range)); } ClearResultset(); found = true; } #ifdef RATDB_DEBUG info << "PgSQLBackend::FetchObject : Returning result...found = " << ((found)?"true":"false") << newline; #endif // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); return result; } void PgSQLBackend::SetDbTag(const std::string &tag) { CheckAndResetConn(); ClearResultset(); std::string query = dformat("SELECT t.id FROM ratdb.ratdb_tags t WHERE t.tag = '%s'",tag.c_str()); ExecuteSelectQuery(query.c_str()); int n_res = PQntuples(pg_res_); if (n_res == 0) { ClearResultset(); Log::Die(dformat("PgSQLBackend::SetDbTag : Couldn't find requested tag [%s]. Aborting.",tag.c_str())); } else if (n_res != 1) { ClearResultset(); Log::Die(dformat("PgSQLBackend::SetDbTag : Got %d results when searching for tag [%s]. Expected 1.",n_res,tag.c_str())); } int id = atoi(PQgetvalue(pg_res_, 0, 0)); ratdb_header_tbl_ = dformat("%s_tag_%d",ratdb_header_tbl_.c_str(),id); ratdb_tag_ = tag; info << BMAGENTA << "PgSQLBackend::SetTag : RATDB tag set to '" << tag << "' (id = " << id << ")" << CLR << newline; // Clear the result ClearResultset(); #ifdef RATDB_DEBUG detail << "PgSQLBackend::FetchObjectWithTag : Looking up tagged table " << ratdb_header_tbl_ << newline; #endif /* end the transaction */ // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); } std::set<std::string> PgSQLBackend::GetListIndexes(const std::string &type_name, const int& run) { CheckAndResetConn(); // -- Return a list of all indexes for a given type std::set<std::string> resultset; std::string query = dformat("SELECT DISTINCT h.index FROM ratdb.%s h WHERE h.type=\'%s\'",ratdb_header_tbl_.c_str(),type_name.c_str()); if (run != 0) { query += dformat(" AND h.run_begin <= %d AND h.run_end >= %d",run,run); } #ifdef RATDB_DEBUG debug << "PgSQLBackend::GetListIndexes : Query: [" << query << "]" << newline; #endif ExecuteSelectQuery(query.c_str()); int n_res = PQntuples(pg_res_); #ifdef RATDB_DEBUG debug << "PgSQLBackend::GetListIndexes : Received : [" << n_res << "] results." << newline; #endif std::string tmp_index; for (int i = 0; i < n_res; ++i) { tmp_index = PQgetvalue(pg_res_, i, 0); resultset.insert(tmp_index); // the second index is for the number of columns in the result } // clear the result ClearResultset(); // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); return resultset; } std::set<std::string> PgSQLBackend::GetListTypes(const int run) { CheckAndResetConn(); std::set<std::string> results; std::string query = dformat("SELECT DISTINCT h.type FROM ratdb.%s h",ratdb_header_tbl_.c_str()); if (run) query += dformat(" WHERE h.run_begin <= %d AND h.run_end >= %d",run,run); #ifdef RATDB_DEBUG info << "PgSQLBackend::GetListTypes : Types available for this run. Query =" << query<< newline; #endif ExecuteSelectQuery(query.c_str()); int n_res = PQntuples(pg_res_); // strip the quotes out of the results or RAT will complain. std::string tmp_type; for (int i = 0; i < n_res; ++i) { tmp_type = PQgetvalue(pg_res_, i, 0); results.insert(tmp_type); // the second index is for the number of columns in the result } ClearResultset(); // Close the connection if we are past BeginOfRun if (bor_done_) Disconnect(); return results; } void PgSQLBackend::FailQuery(const char*query) { std::ostringstream msg; msg << "PgSQLBackend::Failed to execute query to RATDB. [ " << query << "] : " << PQerrorMessage(pg_conn_) << "."; ClearResultset(); Disconnect(); Log::Die(msg.str().c_str()); } } /* namespace RAT */