#include "KM3NeTDB_Client.h" #include #include #include using namespace KM3NeT; using namespace KM3NeT::DB; const Selector::RelOp Selector::RelOp::Equal("="); const Selector::RelOp Selector::RelOp::Different("!="); const Selector::RelOp Selector::RelOp::Less("<"); const Selector::RelOp Selector::RelOp::Greater(">"); const Selector::RelOp Selector::RelOp::LessEqual("<="); const Selector::RelOp Selector::RelOp::GreaterEqual(">="); void _Client::_ZeroAll() { _Usr = ""; _Pwd = ""; _ConnCookie = ""; _method = 0; _ctx = 0; _ssl = 0; _sockfd = 0; _host = 0; _cert = 0; _bytes = 0; _consumedbytes = 0; } void _Client::_ReleaseAll() { if (pResultSet) { pResultSet->Close(); } _SSLDisconnect(); if (_method) { _method = 0; } } void _Client::_InitSSL() { OpenSSL_add_all_algorithms(); ERR_load_crypto_strings(); SSL_load_error_strings(); if (SSL_library_init() < 0) throw DBException("Can't initialize the OpenSSL library.", __LINE__); _method = SSLv23_client_method(); } void _Client::_SSLConnect() { if ( (_ctx = SSL_CTX_new(_method)) == NULL ) throw DBException("Can't create the SSL context.", __LINE__); #ifdef __APPLE__ if ( (_ctx = SSL_CTX_new((SSL_METHOD *)_method)) == NULL ) throw DBException("Can't create the SSL context.", __LINE__); #else if ( (_ssl = SSL_new(_ctx)) == NULL ) throw DBException("Can't create the SSL object.", __LINE__); #endif if ( (_host = gethostbyname(_Srv.HostName().c_str())) == NULL ) { std::ostringstream ss; ss << "Can't resolve hostname " << _Srv.HostName() << "."; throw DBException(ss.str(), __LINE__); } if ( (_sockfd = socket(AF_INET, SOCK_STREAM, 0)) <= 0) throw DBException("Can't create socket.", __LINE__); struct sockaddr_in dest_addr; dest_addr.sin_family = AF_INET; dest_addr.sin_port = htons(_Srv.Port()); dest_addr.sin_addr.s_addr = *(long *)(_host->h_addr); memset(&(dest_addr.sin_zero), 0, 8); char *tmp_ptr = inet_ntoa(dest_addr.sin_addr); if ( connect(_sockfd, (struct sockaddr *) &dest_addr, sizeof(struct sockaddr)) == - 1) { std::ostringstream ss; ss << "Can't connect to host " << _Srv.HostName() << "(" << tmp_ptr << ")."; throw DBException(ss.str(), __LINE__); } SSL_set_fd(_ssl, _sockfd); if ( SSL_connect(_ssl) != 1 ) throw DBException("Could not build a SSL session.", __LINE__); _cert = SSL_get_peer_certificate(_ssl); if (_cert == NULL) throw DBException("Could not get the server certificate.", __LINE__); } void _Client::_SSLDisconnect() { if (_cert) { X509_free(_cert); _cert = 0; } if (_ssl) { SSL_free(_ssl); _ssl = 0; } if (_ctx) { SSL_CTX_free(_ctx); _ctx = 0; } if (_sockfd) { close(_sockfd); _sockfd = 0; } } void _Client::_Init() { _ZeroAll(); if (_Srv.Port() == 0) _Srv = Server::Default; if (_Usr.empty()) _Usr = _GetEnv("KM3NET_DB_USR"); if (_Usr.empty() == false) { if (_Pwd.empty()) _Pwd = _GetEnv("KM3NET_DB_PWD"); } else { if (_ConnCookie.empty()) { _ConnCookie = _GetEnv("KM3NET_DB_CONNCOOKIE"); } unsigned i; for (i = 0; i < _ConnCookie.length() && _ConnCookie[i] == '_'; i++); for (; i < _ConnCookie.length() && _ConnCookie[i] != '_'; i++); if (i++ < _ConnCookie.length()) { std::ostringstream ss; for (; i < _ConnCookie.length() && _ConnCookie[i] != '_'; i++) ss << _ConnCookie[i]; _Usr = ss.str(); } } _InitSSL(); } void _Client::Close() { _ReleaseAll(); } _Client::_Client(const Server &srv, const char *usr, const char *pwd) : _Srv(srv) { _Init(); _Usr = usr; _Pwd = pwd; _ConnCookie = ""; } _Client::_Client(const Server &srv, const char *persistentcookie) : _Srv(srv) { _Init(); if (persistentcookie[0] != PERSISTENT_CONN_CHAR) throw DBException("Cookie cannot describe a persistent connection.", __LINE__); _ConnCookie = persistentcookie; std::ostringstream ss; for (int i = 1; persistentcookie[i] && persistentcookie[i] != PERSISTENT_CONN_CHAR; i++) ss << persistentcookie[i]; _Usr = ss.str(); _Pwd = ""; } _Client::_Client() : _Srv(Server::Default) { _Init(); } int _Client::ReadBytes() { if (_chunked) { if (_thischunksize > 0) { int sz = (int)sizeof(_read_buf) - 1; if (_thischunksize < sz) sz = _thischunksize; _bytes = SSL_read(_ssl, _read_buf, sz); _read_buf[_bytes] = 0; _thischunksize -= _bytes; if (_thischunksize == 0) { char crlf[2]; if (SSL_read(_ssl, crlf, 2) != 2 || crlf[0] != '\r' || crlf[1] != '\n') throw DBException("Error reading chunk terminator.", __LINE__); } return _bytes; } else { char hexc = 0; unsigned chs = 0; do { if (SSL_read(_ssl, &hexc, 1) != 1) throw DBException("Error reading chunk header.", __LINE__); if (hexc >= '0' && hexc <= '9') chs = (chs << 4) + (hexc - '0'); else if (hexc >= 'A' && hexc <= 'F') chs = (chs << 4) + (hexc - 'A' + 10); else if (hexc >= 'a' && hexc <= 'f') chs = (chs << 4) + (hexc - 'a' + 10); } while (hexc != '\r'); if (SSL_read(_ssl, &hexc, 1) != 1 || hexc != '\n') throw DBException("Error reading chunk header.", __LINE__); _thischunksize = chs; if (_thischunksize == 0) return 0; return ReadBytes(); } } else { _bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1); _read_buf[_bytes] = 0; return _bytes; } } int _Client::_GetBytes(char *&data) { if (_consumedbytes < _bytes) { data = _read_buf + _consumedbytes; int ret = _bytes - _consumedbytes; _consumedbytes = _bytes; return ret; } if (ReadBytes() == 0) return 0; _consumedbytes = _bytes; data = _read_buf; return _bytes; } enum HTTPParserState { Done, BeginLine, WaitCR, CR, EmptyCR }; int _Client::_ProcessHTTPHeaders(std::vector &headers, int &scan) { int bytes = 0; scan = 0; int httpcode = -1; HTTPParserState state = BeginLine; std::ostringstream oss; while (state != Done) { if (scan >= bytes) { bytes = ReadBytes(); if (bytes == 0) throw DBException("Malformed HTTP response", __LINE__); scan = 0; } char ch = _read_buf[scan++]; switch (state) { case BeginLine: { if (ch == '\r') { state = EmptyCR; } else { oss << ch; state = WaitCR; } } break; case WaitCR: { if (ch == '\r') { state = CR; } else if (ch >= ' ') oss << ch; } break; case CR: { if (ch == '\n') { std::string s = oss.str(); if (s.empty() == false) headers.push_back(s); oss.clear(); oss.str(""); state = BeginLine; } else state = WaitCR; } break; case EmptyCR: { if (ch == '\n') { state = Done; } else state = WaitCR; } default:; } } if (headers.size() == 0) throw DBException("Malformed HTTP response headers.", __LINE__); std::istringstream iss(headers[0]); std::string httpstr; try { iss >> httpstr; if (httpstr != "HTTP/1.0" && httpstr != "HTTP/1.1") throw DBException("", __LINE__); iss >> httpcode; if (httpcode < 0) throw DBException("", __LINE__); } catch (const std::exception& x1) { std::ostringstream ess; ess << "Malformed HTTP response code \"" << headers[0] << "\"."; throw DBException(ess.str(), __LINE__); } _chunked = false; for (auto h = headers.begin(); h != headers.end(); h++) { std::string h1 = *h; for (int ih = h1.length() - 1; ih >= 0; ih--) h1[ih] = tolower(h1[ih]); std::istringstream iss(h1); std::string m; iss >> m; if (m == "transfer-encoding:") { iss >> m; if (m == "chunked") { _chunked = true; _thischunksize = 0; break; } } } return httpcode; } int _Client::_ProcessHTTPHeaders(std::vector &headers) { _chunked = false; int scan = 0; int httpcode = _ProcessHTTPHeaders(headers, scan); if (httpcode != 200) { std::ostringstream mss; mss << "Connection wrong or refused with HTTP code " << httpcode << " header \"" << headers[0] << "\"."; throw DBException(mss.str(), __LINE__); } return scan; } std::shared_ptr _Client::_DetX(std::ostringstream &oss, int run, const char *t0set, const char *calibset) { bool queryfirst = true; if (run) { oss << (queryfirst ? "?" : "&") << "run=" << run; queryfirst = false; } if (t0set) { oss << (queryfirst ? "?" : "&") << "t0set=" << t0set; queryfirst = false; } if (calibset) { oss << (queryfirst ? "?" : "&") << "calibrid=" << calibset; queryfirst = false; } if (_ConnCookie.empty()) { oss << (queryfirst ? "?" : "&") << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; queryfirst = false; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; queryfirst = false; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::_DetX(std::ostringstream &oss, int run) { bool queryfirst = true; if (run) { oss << (queryfirst ? "?" : "&") << "run=" << run; queryfirst = false; } if (_ConnCookie.empty()) { oss << (queryfirst ? "?" : "&") << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; queryfirst = false; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; queryfirst = false; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::_DetX(std::ostringstream &oss, int run, int version) { bool queryfirst = true; if (run) { oss << (queryfirst ? "?" : "&") << "run=" << run; queryfirst = false; } if (version) { oss << (queryfirst ? "?" : "&") << "&v=" << version; queryfirst = false; } if (_ConnCookie.empty()) { oss << (queryfirst ? "?" : "&") << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; queryfirst = false; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; queryfirst = false; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::_DetX(std::ostringstream &oss, const char *tcal, const char *pcal, const char *rcal) { oss << "?" << "tcal=" << ((tcal != nullptr) ? tcal : "0") << "&" << "pcal=" << ((pcal != nullptr) ? pcal : "0") << "&" << "rcal=" << ((rcal != nullptr) ? rcal : "0"); if (_ConnCookie.empty()) { oss << "&" << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::_DetX(std::ostringstream &oss, const char *tcal, const char *pcal, const char *rcal, const char *acal, const char *ccal, const char *scal, int version) { oss << "?" << "tcal=" << ((tcal != nullptr) ? tcal : "0") << "&" << "pcal=" << ((pcal != nullptr) ? pcal : "0") << "&" << "rcal=" << ((rcal != nullptr) ? rcal : "0") << "&" << "acal=" << ((acal != nullptr) ? acal : "0") << "&" << "ccal=" << ((ccal != nullptr) ? ccal : "0") << "&" << "scal=" << ((scal != nullptr) ? scal : "0") << "&" << "v=" << version; if (_ConnCookie.empty()) { oss << "&" << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::DetX(int detector, int run, const char *t0set, const char *calibset) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run, t0set, calibset); } std::shared_ptr _Client::DetX(const char *detector, int run, const char *t0set, const char *calibset) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run, t0set, calibset); } std::shared_ptr _Client::DetX(int detector, int run) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run); } std::shared_ptr _Client::DetX(const char *detector, int run) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run); } std::shared_ptr _Client::DetX(int detector, int run, int version) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run, version); } std::shared_ptr _Client::DetX(const char *detector, int run, int version) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, run, version); } std::shared_ptr _Client::DetX(int detector, const char *tcal, const char *pcal, const char *rcal) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, tcal, pcal, rcal); } std::shared_ptr _Client::DetX(const char *detector, const char *tcal, const char *pcal, const char *rcal) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, tcal, pcal, rcal); } std::shared_ptr _Client::DetX(int detector, const char *tcal, const char *pcal, const char *rcal, const char *acal, const char *ccal, const char *scal, int version) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, tcal, pcal, rcal, acal, ccal, scal, version); } std::shared_ptr _Client::DetX(const char *detector, const char *tcal, const char *pcal, const char *rcal, const char *acal, const char *ccal, const char *scal, int version) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /detx/" << detector; return _DetX(oss, tcal, pcal, rcal, acal, ccal, scal, version); } ResultSet &_Client::StreamDS() { std::vector empvec; return StreamDS(0, empvec); } ResultSet &_Client::StreamDS(const char *streamname, const std::vector &selectors) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /streamds/"; bool isfirst = true; if (streamname) { oss << streamname << ".txt"; for (auto it = selectors.begin(); it != selectors.end(); it++) { if (isfirst) { oss << "?"; isfirst = false; } else oss << "&"; oss << it->Name << it->RelationalOperator.Render() << it->Value; } } if (_ConnCookie.empty()) { oss << (isfirst ? "?" : "&") << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); std::vector headers; _consumedbytes = _ProcessHTTPHeaders(headers); pResultSet = std::shared_ptr(new _StreamDSResultSet(this)); return *pResultSet; } void _Client::_ResultSetClose() { if (pResultSet == 0) throw DBException("No resultset is open.", __LINE__); _SSLDisconnect(); pResultSet.reset(); } std::shared_ptr _Client::APIv2Access(const char mode, const char *apiversion, const char *entrypoint, const std::vector &selectors) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); std::ostringstream oss; oss << "GET /apiv" << apiversion << "/" << entrypoint << "/" << mode; bool isfirst = true; for (auto it = selectors.begin(); it != selectors.end(); it++) { if (isfirst) { oss << "?"; isfirst = false; } else oss << "&"; oss << it->Name << it->RelationalOperator.Render() << it->Value; } if (_ConnCookie.empty()) { oss << (isfirst ? "?" : "&") << "usr=" << _Usr << "&pwd=" << _Pwd; oss << " HTTP/1.1\r\nConnection: close\r\n\r\n"; } else { oss << " HTTP/1.1\r\nCookie: sid=" << _ConnCookie << "\r\nConnection: close\r\n\r\n"; } _SSLConnect(); SSL_write(_ssl, oss.str().c_str(), (int)oss.str().length()); int bytes; std::stringstream *poutss = new std::stringstream(); std::vector headers; bytes = _ProcessHTTPHeaders(headers); *poutss << (_read_buf + bytes); while ((bytes = SSL_read(_ssl, _read_buf, sizeof(_read_buf) - 1)) > 0) { _read_buf[bytes] = 0; *poutss << _read_buf; } _SSLDisconnect(); return std::shared_ptr(poutss); } std::shared_ptr _Client::APIv2Help(const char *apiversion, const char *entrypoint) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); return APIv2Access('h', apiversion, entrypoint, std::vector()); } std::shared_ptr _Client::APIv2Select(const char *apiversion, const char *entrypoint, const std::vector &selectors) { if (pResultSet) throw DBException("Another resultset is open on this Client. Close it first.", __LINE__); return APIv2Access('s', apiversion, entrypoint, selectors); }