/******************************************************************************/
/* */
/* X r d C l i e n t C o n n . c c */
/* */
/* Author: Fabrizio Furano (INFN Padova, 2004) */
/* Adapted from TXNetFile (root.cern.ch) originally done by */
/* Alvise Dorigo, Fabrizio Furano */
/* INFN Padova, 2003 */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// High level handler of connections to xrootd. //
// //
//////////////////////////////////////////////////////////////////////////
#include "XrdClient/XrdClientDebug.hh"
#include "XrdClient/XrdClientConnMgr.hh"
#include "XrdClient/XrdClientConn.hh"
#include "XrdClient/XrdClientLogConnection.hh"
#include "XrdClient/XrdClientPhyConnection.hh"
#include "XrdClient/XrdClientProtocol.hh"
#include "XrdNet/XrdNetAddr.hh"
#include "XrdOuc/XrdOucEnv.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdSec/XrdSecInterface.hh"
#include "XrdSec/XrdSecLoadSecurity.hh"
#include "XrdSys/XrdSysDNS.hh"
#include "XrdClient/XrdClientUrlInfo.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdClient/XrdClientAbs.hh"
#include "XrdClient/XrdClientSid.hh"
#include "XrdSys/XrdSysPriv.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSec/XrdSecLoadSecurity.hh"
// Dynamic libs
// Bypass Solaris ELF madness
//
#if defined(__solaris__)
#include
#if defined(_ILP32) && (_FILE_OFFSET_BITS != 32)
#undef _FILE_OFFSET_BITS
#define _FILE_OFFSET_BITS 32
#undef _LARGEFILE_SOURCE
#endif
#endif
#include // needed by printf
#include // needed by getenv()
#ifndef WIN32
#include // needed by getpid()
#include // needed by getpid() and getuid()
#else
#include
#include "XrdSys/XrdWin32.hh"
#endif
#include // needed by memcpy() and strcspn()
#include // needed by getservbyname()
#include
#define SafeDelete(x) { if (x) { delete x; x = 0; } }
XrdSysMutex XrdClientConn::fSessionIDRMutex;
XrdOucHash XrdClientConn::fSessionIDRepo;
// Instance of the Connection Manager
XrdClientConnectionMgr *XrdClientConn::fgConnectionMgr = 0;
XrdOucString XrdClientConn::fgClientHostDomain;
//_____________________________________________________________________________
void ParseRedirHost(XrdOucString &host, XrdOucString &opaque, XrdOucString &token)
{
// Small utility function... we want to parse a hostname which
// can contain opaque or token info
int pos;
token = "";
opaque = "";
if ( (pos = host.find('?')) != STR_NPOS ) {
opaque.assign(host,pos+1);
host.erasefromend(host.length()-pos);
if ( (pos = opaque.find('?')) != STR_NPOS ) {
token.assign(host,pos+1);
opaque.erasefromend(opaque.length()-pos);
}
}
}
//_____________________________________________________________________________
void ParseRedir(XrdClientMessage* xmsg, int &port, XrdOucString &host, XrdOucString &opaque, XrdOucString &token)
{
// Small utility function... we want to parse the content
// of a redir response from the server.
// Remember... an instance of XrdClientMessage automatically 0-terminates the
// data if present
struct ServerResponseBody_Redirect* redirdata =
(struct ServerResponseBody_Redirect*)xmsg->GetData();
port = 0;
if (redirdata) {
XrdOucString h(redirdata->host);
ParseRedirHost(h, opaque, token);
host = h;
port = ntohl(redirdata->port);
}
}
//_____________________________________________________________________________
XrdClientConn::XrdClientConn(): fOpenError((XErrorCode)0), fUrl(""),
fLBSUrl(0),
fConnected(false),
fGettingAccessToSrv(false),
fMainReadCache(0),
fREQWaitRespData(0),
fREQWaitTimeLimit(0),
fREQConnectWaitTimeLimit(0),
fMetaUrl( 0 ),
fLBSIsMeta( false )
{
// Constructor
ClearLastServerError();
memset(&LastServerResp, 0, sizeof(LastServerResp));
LastServerResp.status = kXR_noResponsesYet;
fREQUrl.Clear();
fREQWait = new XrdSysCondVar(0);
fREQConnectWait = new XrdSysCondVar(0);
fREQWaitResp = new XrdSysCondVar(0);
fWriteWaitAck = new XrdSysCondVar(0);
fRedirHandler = 0;
fUnsolMsgHandler = 0;
// Init the redirection counter parameters
fGlobalRedirLastUpdateTimestamp = time(0);
fGlobalRedirCnt = 0;
fMaxGlobalRedirCnt = EnvGetLong(NAME_MAXREDIRECTCOUNT);
fOpenSockFD = -1;
// Init connection manager (only once)
if (!fgConnectionMgr) {
if (!(fgConnectionMgr = new XrdClientConnectionMgr())) {
Error("XrdClientConn::XrdClientConn", "initializing connection manager");
}
char buf[255];
gethostname(buf, sizeof(buf));
fgClientHostDomain = GetDomainToMatch(buf);
if (fgClientHostDomain == "")
Error("XrdClientConn",
"Error resolving this host's domain name." );
XrdOucString goodDomainsRE = fgClientHostDomain;
goodDomainsRE += "|*";
if (EnvGetString(NAME_REDIRDOMAINALLOW_RE) == 0)
EnvPutString(NAME_REDIRDOMAINALLOW_RE, goodDomainsRE.c_str());
if (EnvGetString(NAME_REDIRDOMAINDENY_RE) == 0)
EnvPutString(NAME_REDIRDOMAINDENY_RE, "");
if (EnvGetString(NAME_CONNECTDOMAINALLOW_RE) == 0)
EnvPutString(NAME_CONNECTDOMAINALLOW_RE, goodDomainsRE.c_str());
if (EnvGetString(NAME_CONNECTDOMAINDENY_RE) == 0)
EnvPutString(NAME_CONNECTDOMAINDENY_RE, "");
}
// Server type unknown at initialization
fServerType = kSTNone;
}
//_____________________________________________________________________________
XrdClientConn::~XrdClientConn()
{
// Disconnect underlying logical connection
Disconnect(FALSE);
// Destructor
if (fMainReadCache && (DebugLevel() >= XrdClientDebug::kUSERDEBUG))
fMainReadCache->PrintPerfCounters();
if (fLBSUrl) delete fLBSUrl;
if (fMainReadCache) delete fMainReadCache;
fMainReadCache = 0;
delete fREQWait;
fREQWait = 0;
delete fREQConnectWait;
fREQConnectWait = 0;
delete fREQWaitResp;
fREQWaitResp = 0;
delete fWriteWaitAck;
fWriteWaitAck = 0;
}
//_____________________________________________________________________________
short XrdClientConn::Connect(XrdClientUrlInfo Host2Conn,
XrdClientAbsUnsolMsgHandler *unsolhandler)
{
// Connect method (called the first time when XrdClient is first created,
// and used for each redirection). The global static connection manager
// object is firstly created here. If another XrdClient object is created
// inside the same application this connection manager will be used and
// no new one will be created.
// No login/authentication are performed at this stage.
// We try to connect to the host. What we get is the logical conn id
short logid;
logid = -1;
fPrimaryStreamid = 0;
fLogConnID = 0;
CheckREQConnectWaitState();
Info(XrdClientDebug::kHIDEBUG,
"XrdClientConn", "Trying to connect to " <<
Host2Conn.HostAddr << ":" << Host2Conn.Port);
logid = ConnectionManager->Connect(Host2Conn);
Info(XrdClientDebug::kHIDEBUG,
"Connect", "Connect(" << Host2Conn.Host << ", " <<
Host2Conn.Port << ") returned " <<
logid );
if (logid < 0) {
Error("XrdNetFile",
"Error creating logical connection to " <<
Host2Conn.Host << ":" << Host2Conn.Port );
fLogConnID = logid;
fConnected = FALSE;
return -1;
}
fConnected = TRUE;
fLogConnID = logid;
fPrimaryStreamid = ConnectionManager->GetConnection(fLogConnID)->Streamid();
ConnectionManager->GetConnection(fLogConnID)->UnsolicitedMsgHandler = unsolhandler;
fUnsolMsgHandler = unsolhandler;
return logid;
}
//_____________________________________________________________________________
void XrdClientConn::Disconnect(bool ForcePhysicalDisc)
{
// Disconnect... is it so difficult? Yes!
if( ConnectionManager->SidManager() )
ConnectionManager->SidManager()->GetAllOutstandingWriteRequests(fPrimaryStreamid, fWriteReqsToRetry);
if (fMainReadCache && (DebugLevel() >= XrdClientDebug::kDUMPDEBUG) ) fMainReadCache->PrintCache();
if (fConnected)
ConnectionManager->Disconnect(fLogConnID, ForcePhysicalDisc);
fConnected = FALSE;
}
//_____________________________________________________________________________
XrdClientMessage *XrdClientConn::ClientServerCmd(ClientRequest *req, const void *reqMoreData,
void **answMoreDataAllocated,
void *answMoreData, bool HasToAlloc,
int substreamid)
{
// ClientServerCmd tries to send a command to the server and to get a response.
// Here the kXR_redirect is handled, as well as other things.
//
// If the calling function requests the memory allocation (HasToAlloc is true)
// then:
// o answMoreDataAllocated is filled with a pointer to the new block.
// o The caller MUST free it when it's no longer used if
// answMoreDataAllocated is 0
// then the caller is not interested in getting the data.
// o We must anyway read it from the stream and throw it away.
//
// If the calling function does NOT request the memory allocation
// (HasToAlloc is false) then:
// o answMoreData is filled with the data read
// o the caller MUST be sure that the arriving data will fit into the
// o passed memory block
//
// We need to do this here because the calling func *may* not know the size
// to allocate for the request to be submitted. For instance, for the kXR_read
// cmd the size is known, while for the kXR_getfile cmd is not.
bool addOpaque = false;
size_t TotalBlkSize = 0;
void *tmpMoreData;
XReqErrorType errorType = kOK;
XrdClientMessage *xmsg = 0;
// Check for a redrive of an open from a previous redirect
//
if (req->header.requestid == kXR_open && fRedirOpaque.length())
addOpaque = true;
// In the case of an abort due to errors, better to return
// a blank struct. Also checks the validity of the pointer.
// memset(answhdr, 0, sizeof(answhdr));
// Cycle for redirections...
do {
// Send to the server the request
// We have to unconditionally set the streamid inside the
// header, because, in case of 'rebouncing here', the Logical Connection
// ID might have changed, while in the header to write it remained the
// same as before, not valid anymore
SetSID(req->header.streamid);
// Check for redirections of a filename
//
if (addOpaque)
{string new_fname;
int oldlen = req->header.dlen;
new_fname.assign((const char *)reqMoreData);
if (new_fname.find('?') == string::npos) new_fname += "?";
else new_fname += "&";
new_fname += string(fRedirOpaque.c_str());
req->open.dlen = new_fname.length();
fRedirOpaque.erase(); addOpaque = false;
errorType = WriteToServer(req, new_fname.c_str(), fLogConnID, substreamid);
req->header.dlen = oldlen;
} else {
errorType = WriteToServer(req, reqMoreData, fLogConnID, substreamid);
}
// Read from server the answer
// Note that the answer can be composed by many reads, in the case that
// the status field of the responses is kXR_oksofar
TotalBlkSize = 0;
// A temp pointer to the mem block growing across the multiple kXR_oksofar
tmpMoreData = 0;
if ((answMoreData != 0) && !HasToAlloc)
tmpMoreData = answMoreData;
// Cycle for the kXR_oksofar i.e. partial answers to be collected
do {
XrdClientConn::EThreeStateReadHandler whatToDo;
delete xmsg;
xmsg = ReadPartialAnswer(errorType, TotalBlkSize, req, HasToAlloc,
&tmpMoreData, whatToDo);
// If the cmd went ok and was a read request, we use it to populate
// the cache
if (xmsg && fMainReadCache && (req->header.requestid == kXR_read) &&
((xmsg->HeaderStatus() == kXR_oksofar) ||
(xmsg->HeaderStatus() == kXR_ok)))
// To compute the end offset of the block we have to take 1 from the size!
fMainReadCache->SubmitXMessage(xmsg, req->read.offset + TotalBlkSize - xmsg->fHdr.dlen,
req->read.offset + TotalBlkSize - 1);
if (whatToDo == kTSRHReturnNullMex) {
delete xmsg;
return 0;
}
if (whatToDo == kTSRHReturnMex)
return xmsg;
if (xmsg && (xmsg->HeaderStatus() == kXR_oksofar) &&
(xmsg->DataLen() == 0))
return xmsg;
} while (xmsg && (xmsg->HeaderStatus() == kXR_oksofar));
if (xmsg && (xmsg->HeaderStatus() == kXR_redirect) && fRedirOpaque.length()
&& ( req->header.requestid == kXR_open
|| (req->header.requestid == kXR_stat && req->header.dlen)
|| req->header.requestid == kXR_dirlist
|| req->header.requestid == kXR_locate
|| req->header.requestid == kXR_mkdir
|| req->header.requestid == kXR_rm
|| req->header.requestid == kXR_rmdir
|| (req->header.requestid == kXR_truncate && req->header.dlen)
|| req->header.requestid == kXR_mv
|| req->header.requestid == kXR_chmod))
addOpaque = true;
} while ((fGlobalRedirCnt < fMaxGlobalRedirCnt) &&
!IsOpTimeLimitElapsed(time(0)) &&
xmsg && (xmsg->HeaderStatus() == kXR_redirect));
// We collected all the partial responses into a single memory block.
// If the block has been allocated here then we must pass its address
if (HasToAlloc && (answMoreDataAllocated)) {
*answMoreDataAllocated = tmpMoreData;
}
// We might have collected multiple partial response also in a given mem block
if (xmsg && (xmsg->HeaderStatus() == kXR_ok) && TotalBlkSize)
xmsg->fHdr.dlen = TotalBlkSize;
return xmsg;
}
//_____________________________________________________________________________
bool XrdClientConn::SendGenCommand(ClientRequest *req, const void *reqMoreData,
void **answMoreDataAllocated,
void *answMoreData, bool HasToAlloc,
char *CmdName,
int substreamid) {
// SendGenCommand tries to send a single command for a number of times
short retry = 0;
bool resp = FALSE, abortcmd = FALSE;
// if we're going to open a file for the 2nd time we should reset fOpenError,
// just in case...
if (req->header.requestid == kXR_open)
fOpenError = (XErrorCode)0;
while (!abortcmd && !resp) {
abortcmd = FALSE;
// This client might have been paused
CheckREQPauseState();
// Send the cmd, dealing automatically with redirections and
// redirections on error
Info(XrdClientDebug::kHIDEBUG,
"SendGenCommand","Sending command " << CmdName);
// Note: some older server versions expose a bug associated to kXR_retstat
if ( (req->header.requestid == kXR_open) &&
(GetServerProtocol() < 0x00000270) ) {
if (req->open.options & kXR_retstat)
req->open.options ^= kXR_retstat;
Info(XrdClientDebug::kHIDEBUG, "SendGenCommand",
"Old server proto version(" << GetServerProtocol() <<
". kXR_retstat is now disabled. Current open options: " << req->open.options);
}
XrdClientMessage *cmdrespMex = ClientServerCmd(req, reqMoreData,
answMoreDataAllocated,
answMoreData, HasToAlloc,
substreamid);
// Save server response header if requested
if (cmdrespMex)
memcpy(&LastServerResp, &cmdrespMex->fHdr,sizeof(struct ServerResponseHeader));
// Check for the max time allowed for this request
if (IsOpTimeLimitElapsed(time(0))) {
Error("SendGenCommand",
"Max time limit elapsed for request " <<
convertRequestIdToChar(req->header.requestid) <<
". Aborting command.");
abortcmd = TRUE;
} else
// Check for the redir count limit
if (fGlobalRedirCnt >= fMaxGlobalRedirCnt) {
Error("SendGenCommand",
"Too many redirections for request " <<
convertRequestIdToChar(req->header.requestid) <<
". Aborting command.");
abortcmd = TRUE;
}
else {
// On serious communication error we retry for a number of times,
// waiting for the server to come back
if (!cmdrespMex || cmdrespMex->IsError()) {
Info(XrdClientDebug::kHIDEBUG,
"SendGenCommand", "Got (and maybe recovered) an error from " <<
fUrl.Host << ":" << fUrl.Port);
// For the kxr_open request we don't rely on the count limit of other
// reqs. The open request is bounded only by the redir count limit
if (req->header.requestid != kXR_open)
retry++;
if (retry > kXR_maxReqRetry) {
Error("SendGenCommand",
"Too many errors communication errors with server"
". Aborting command.");
abortcmd = TRUE;
}
else
if (req->header.requestid == kXR_bind) {
Info(XrdClientDebug::kHIDEBUG,
"SendGenCommand", "Parallel stream bind failure. Aborting request." <<
fUrl.Host << ":" << fUrl.Port);
abortcmd = TRUE;
}
else {
// Here we are connected, but we could not have a filehandle for
// various reasons. The server may have denied it to us.
// So, if we were requesting things that needed a filehandle,
// and the file seems not open, then abort the request
if ( (LastServerResp.status != kXR_ok) &&
( (req->header.requestid == kXR_read) ||
(req->header.requestid == kXR_write) ||
(req->header.requestid == kXR_sync) ||
(req->header.requestid == kXR_close) ) ) {
Info(XrdClientDebug::kHIDEBUG,
"SendGenCommand", "Recovery failure detected. Aborting request." <<
fUrl.Host << ":" << fUrl.Port);
abortcmd = TRUE;
}
else
abortcmd = FALSE;
}
} else {
// We are here if we got an answer for the command, so
// the server (original or redirected) is alive
resp = CheckResp(&cmdrespMex->fHdr, CmdName);
retry++;
// If the answer was not (or not totally) positive, we must
// investigate on the result
if (!resp) {
// this could be a delayed response. A Strange hybrid. Not a quark.
if (cmdrespMex->fHdr.status == kXR_waitresp) {
// Let's sleep!
kXR_int32 *maxwait = (kXR_int32 *)cmdrespMex->GetData();
kXR_int32 mw;
if (maxwait)
mw = ntohl(*maxwait);
else mw = 30;
if (!WaitResp(mw)) {
// we did not time out, so the response is here
memcpy(&LastServerResp, &fREQWaitRespData->resphdr,
sizeof(struct ServerResponseHeader));
// Let's fake a regular answer
// Note: kXR_wait can be a fake response used to make the client retry!
if (fREQWaitRespData->resphdr.status == kXR_wait) {
cmdrespMex->fHdr.status = kXR_wait;
if (fREQWaitRespData->resphdr.dlen)
memcpy(cmdrespMex->GetData(), fREQWaitRespData->respdata, sizeof(kXR_int32));
else memset(cmdrespMex->GetData(), 0, sizeof(kXR_int32));
CheckErrorStatus(cmdrespMex, retry, CmdName);
// This causes a retry
resp = false;
}
else {
if (HasToAlloc) {
*answMoreDataAllocated = malloc(LastServerResp.dlen);
memcpy(*answMoreDataAllocated,
&fREQWaitRespData->respdata,
LastServerResp.dlen);
}
else {
if (answMoreData) memcpy(answMoreData,
&fREQWaitRespData->respdata,
LastServerResp.dlen);
}
// This makes the request exit with the new answer
resp = true;
}
free( fREQWaitRespData);
fREQWaitRespData = 0;
// abortcmd = false; Don't set abort categorically false, check for error!
abortcmd = (LastServerResp.status == kXR_error);
}
}
else {
abortcmd = CheckErrorStatus(cmdrespMex, retry, CmdName);
// An open request which fails for an application reason like kxr_wait
// must have its kXR_Refresh bit cleared.
if (req->header.requestid == kXR_open)
req->open.options &= ((kXR_unt16)~kXR_refresh);
}
}
if (retry > kXR_maxReqRetry) {
Error("SendGenCommand",
"Too many errors messages from server."
" Aborting command.");
abortcmd = TRUE;
}
} // else... the case of a correct server response but declaring an error
}
delete cmdrespMex;
} // while
return (!abortcmd);
}
//_____________________________________________________________________________
bool XrdClientConn::CheckHostDomain(XrdOucString hostToCheck)
{
// Checks domain matching
static XrdOucHash knownHosts;
static XrdOucString alloweddomains = EnvGetString(NAME_REDIRDOMAINALLOW_RE);
static XrdOucString denieddomains = EnvGetString(NAME_REDIRDOMAINDENY_RE);
static XrdSysMutex knownHostsMutex;
XrdSysMutexHelper scopedLock(knownHostsMutex);
// Check cached info
int *he = knownHosts.Find(hostToCheck.c_str());
if (he)
return (*he == 1) ? TRUE : FALSE;
// Get the domain for the url to check
XrdOucString domain = GetDomainToMatch(hostToCheck);
// If we are unable to get the domain for the url to check --> access denied to it
if (domain.length() <= 0) {
Error("CheckHostDomain", "Error resolving domain name for " <<
hostToCheck << ". Denying access.");
return FALSE;
}
Info(XrdClientDebug::kHIDEBUG, "CheckHostDomain", "Resolved [" << hostToCheck <<
"]'s domain name into [" << domain << "]" );
// Given a list of |-separated regexps for the hosts to DENY,
// match every entry with domain. If any match is found, deny access.
if (DomainMatcher(domain, denieddomains) ) {
knownHosts.Add(hostToCheck.c_str(), new int(0));
Error("CheckHostDomain", "Access denied to the domain of [" << hostToCheck << "].");
return FALSE;
}
// Given a list of |-separated regexps for the hosts to ALLOW,
// match every entry with domain. If any match is found, grant access.
if (DomainMatcher(domain, alloweddomains) ) {
knownHosts.Add(hostToCheck.c_str(), new int(1));
Info(XrdClientDebug::kHIDEBUG, "CheckHostDomain",
"Access granted to the domain of [" << hostToCheck << "].");
return TRUE;
}
Error("CheckHostDomain", "Access to domain " << domain <<
" is not allowed nor denied: deny.");
return FALSE;
}
//___________________________________________________________________________
bool XrdClientConn::DomainMatcher(XrdOucString dom, XrdOucString domlist)
{
// Check matching of domain 'dom' in list 'domlist'.
// Items in list are separated by '|' and can contain the wild
// cards '*', e.g.
//
// domlist.c_str() = "cern.ch|*.stanford.edu|slac.*.edu"
//
// The domain to match is a FQDN host or domain name, e.g.
//
// dom.c_str() = "flora02.slac.stanford.edu"
//
Info(XrdClientDebug::kHIDEBUG, "DomainMatcher",
"search for '"< 0) {
XrdOucString domain;
int nm = 0, from = 0;
while ((from = domlist.tokenize(domain, from, '|')) != STR_NPOS) {
Info(XrdClientDebug::kDUMPDEBUG, "DomainMatcher",
"checking domain: "< 0) {
Info(XrdClientDebug::kHIDEBUG, "DomainMatcher",
"domain: "<status == kXR_redirect) {
// too many redirections. Exit!
Error(method, "Error in handling a redirection.");
return FALSE;
}
if ((resp->status != kXR_ok) && (resp->status != kXR_authmore))
// Error message is notified in CheckErrorStatus
return FALSE;
return TRUE;
} else {
Error(method, "The return message doesn't belong to this client.");
return FALSE;
}
}
//_____________________________________________________________________________
bool XrdClientConn::MatchStreamid(struct ServerResponseHeader *ServerResponse)
{
// Check stream ID matching between the given response and
// the one contained in the current logical conn
return ( memcmp(ServerResponse->streamid,
&fPrimaryStreamid,
sizeof(ServerResponse->streamid)) == 0 );
}
//_____________________________________________________________________________
void XrdClientConn::SetSID(kXR_char *sid) {
// Set our stream id, to match against that one in the server's response.
memcpy((void *)sid, (const void*)&fPrimaryStreamid, 2);
}
//_____________________________________________________________________________
XReqErrorType XrdClientConn::WriteToServer(ClientRequest *req,
const void* reqMoreData,
short LogConnID,
int substreamid) {
// Send message to server
ClientRequest req_netfmt = *req;
XrdClientLogConnection *lgc = 0;
XrdClientPhyConnection *phyc = 0;
if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
smartPrintClientHeader(req);
lgc = ConnectionManager->GetConnection(LogConnID);
if (!lgc) {
Error("WriteToServer",
"Unknown logical conn " << LogConnID);
return kWRITE;
}
phyc = lgc->GetPhyConnection();
if (!phyc) {
Error("WriteToServer",
"Cannot find physical conn for logid " << LogConnID);
return kWRITE;
}
clientMarshall(&req_netfmt);
// Strong mutual exclusion over the physical channel
{
XrdClientPhyConnLocker pcl(phyc);
// Now we write the request to the logical connection through the
// connection manager
short len = sizeof(req->header);
// A request header is always sent through the main stream, except for kxr_bind!
int writeres;
if ( req->header.requestid == kXR_bind )
writeres = ConnectionManager->WriteRaw(LogConnID, &req_netfmt, len, substreamid);
else
writeres = ConnectionManager->WriteRaw(LogConnID, &req_netfmt, len, 0);
fLastDataBytesSent = req->header.dlen;
// A complete communication failure has to be handled later, but we
// don't have to abort what we are doing
if (writeres < 0) {
Error("WriteToServer",
"Error sending " << len << " bytes in the header part"
" to server [" <<
fUrl.Host << ":" << fUrl.Port << "].");
return kWRITE;
}
// Send to the server the data.
// If we got an error we can safely skip this... no need to get more
if (req->header.dlen > 0) {
// Now we write the data associated to the request. Through the
// connection manager
// the data chunk can be sent through a parallel stream
writeres = ConnectionManager->WriteRaw(LogConnID, reqMoreData,
req->header.dlen, substreamid);
// A complete communication failure has to be handled later, but we
// don't have to abort what we are doing
if (writeres < 0) {
Error("WriteToServer",
"Error sending " << req->header.dlen << " bytes in the data part"
" to server [" <<
fUrl.Host << ":" << fUrl.Port << "].");
return kWRITE;
}
}
fLastDataBytesSent = req->header.dlen;
return kOK;
}
}
//_____________________________________________________________________________
bool XrdClientConn::CheckErrorStatus(XrdClientMessage *mex, short &Retry, char *CmdName)
{
// Check error status, returns true if the retrials have to be aborted
if (mex->HeaderStatus() == kXR_redirect) {
// Too many redirections
Error("CheckErrorStatus",
"Error while being redirected for request " << CmdName );
return TRUE;
}
if (mex->HeaderStatus() == kXR_error) {
// The server declared an error.
// In this case it's better to exit, unhandled error
struct ServerResponseBody_Error *body_err;
body_err = (struct ServerResponseBody_Error *)(mex->GetData());
if (body_err) {
// Print out the error information, as received by the server
fOpenError = (XErrorCode)ntohl(body_err->errnum);
Info(XrdClientDebug::kNODEBUG, "CheckErrorStatus", "Server [" << GetCurrentUrl().HostWPort <<
"] declared: " <<
(const char*)body_err->errmsg << "(error code: " << fOpenError << ")");
// Save the last error received
memset(&LastServerError, 0, sizeof(LastServerError));
memcpy(&LastServerError, body_err, mex->DataLen());
LastServerError.errnum = fOpenError;
}
return TRUE;
}
if (mex->HeaderStatus() == kXR_wait) {
// We have to wait for a specified number of seconds and then
// retry the same cmd
struct ServerResponseBody_Wait *body_wait;
body_wait = (struct ServerResponseBody_Wait *)mex->GetData();
if (body_wait) {
if (mex->DataLen() > 4)
Info(XrdClientDebug::kUSERDEBUG, "CheckErrorStatus", "Server [" <<
fUrl.Host << ":" << fUrl.Port <<
"] requested " << ntohl(body_wait->seconds) << " seconds"
" of wait. Server message is " << body_wait->infomsg)
else
Info(XrdClientDebug::kUSERDEBUG, "CheckErrorStatus", "Server [" <<
fUrl.Host << ":" << fUrl.Port <<
"] requested " << ntohl(body_wait->seconds) << " seconds"
" of wait")
// Check if we have to sleep
int cmw = (getenv("XRDCLIENTMAXWAIT")) ? atoi(getenv("XRDCLIENTMAXWAIT")) : -1;
int bws = (int)ntohl(body_wait->seconds);
if ((cmw > -1) && cmw < bws) {
Error("CheckErrorStatus", "XROOTD MaxWait forced - file is offline"
". Aborting command. " << cmw << " : " << bws);
Retry= kXR_maxReqRetry;
return TRUE;
}
// Look for too stupid a delay. In this case, set a reasonable value.
int newbws = bws;
if (bws <= 0) newbws = 1;
if (bws > 1800) newbws = 10;
if (bws != newbws)
Error("CheckErrorStatus", "Sleep time fixed from " << bws << " to " << newbws);
// Sleep now, and hope that the sandman does not enter here.
sleep(newbws);
}
// We don't want kxr_wait to count as an error
Retry--;
return FALSE;
}
// We don't understand what the server said. Better investigate on it...
Error("CheckErrorStatus",
"Answer from server [" <<
fUrl.Host << ":" << fUrl.Port <<
"] not recognized after executing " << CmdName);
return TRUE;
}
//_____________________________________________________________________________
XrdClientMessage *XrdClientConn::ReadPartialAnswer(XReqErrorType &errorType,
size_t &TotalBlkSize,
ClientRequest *req,
bool HasToAlloc, void** tmpMoreData,
EThreeStateReadHandler &what_to_do)
{
// Read server answer
XrdClientMessage *Xmsg = 0;
void *tmp2MoreData;
// No need to actually read if we are in error...
if (errorType == kOK) {
Info(XrdClientDebug::kHIDEBUG, "ReadPartialAnswer",
"Reading a XrdClientMessage from the server [" <<
fUrl.Host << ":" << fUrl.Port << "]...");
// A complete communication failure has to be handled later, but we
// don't have to abort what we are doing
// Beware! Now Xmsg contains ALSO the information about the esit of
// the communication at low level.
Xmsg = ConnectionManager->ReadMsg(fLogConnID);
fLastDataBytesRecv = Xmsg ? Xmsg->DataLen() : 0;
if ( !Xmsg || (Xmsg->IsError()) ) {
Info(XrdClientDebug::kNODEBUG, "ReadPartialAnswer", "Failed to read msg from connmgr"
" (server [" << fUrl.Host << ":" << fUrl.Port << "]). Retrying ...");
if (HasToAlloc) {
if (*tmpMoreData)
free(*tmpMoreData);
*tmpMoreData = 0;
}
errorType = kREAD;
}
else
// is not necessary because the Connection Manager unmarshalls the mex
Xmsg->Unmarshall();
}
if (Xmsg != 0)
if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
smartPrintServerHeader(&Xmsg->fHdr);
// Now we have all the data. We must copy it back to the buffer where
// they are needed, only if we are not in troubles with errorType
if ((errorType == kOK) && (Xmsg->DataLen() > 0)) {
// If this is a redirection answer, its data MUST NOT overwrite
// the given buffer
if ( (Xmsg->HeaderStatus() == kXR_ok) ||
(Xmsg->HeaderStatus() == kXR_oksofar) ||
(Xmsg->HeaderStatus() == kXR_authmore) )
{
// Now we allocate a sufficient memory block, if needed
// If the calling function passed a null pointer, then we
// fill it with the new pointer, otherwise the func knew
// about the size of the expected answer, and we use
// the given pointer.
// We need to do this here because the calling func *may* not
// know the size to allocate
// For instance, for the ReadBuffer cmd the size is known, while
// for the ReadFile cmd is not
if (HasToAlloc) {
tmp2MoreData = realloc(*tmpMoreData, TotalBlkSize + Xmsg->DataLen());
if (!tmp2MoreData) {
Error("ReadPartialAnswer", "Error reallocating " <<
TotalBlkSize << " bytes.");
free(*tmpMoreData);
*tmpMoreData = 0;
what_to_do = kTSRHReturnNullMex;
delete Xmsg;
return 0;
}
*tmpMoreData = tmp2MoreData;
}
// Now we copy the content of the Xmsg to the buffer where
// the data are needed
if (*tmpMoreData)
memcpy(((kXR_char *)(*tmpMoreData)) + TotalBlkSize,
Xmsg->GetData(), Xmsg->DataLen());
// Dump the buffer tmpMoreData
// if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG) {
// Info (XrdClientDebug::kDUMPDEBUG, "ReadPartialAnswer","Dumping read data...");
// for(int jj = 0; jj < Xmsg->DataLen(); jj++) {
// printf("0x%.2x ", *( ((kXR_char *)Xmsg->GetData()) + jj ) );
// if ( !((jj+1) % 10) ) printf("\n");
// }
// printf("\n\n");
// }
TotalBlkSize += Xmsg->DataLen();
} else {
Info(XrdClientDebug::kHIDEBUG, "ReadPartialAnswer",
"Server [" <<
fUrl.Host << ":" << fUrl.Port << "] answered [" <<
convertRespStatusToChar(Xmsg->fHdr.status) <<
"] (" << Xmsg->fHdr.status << ")");
}
} // End of DATA reading
// Now answhdr contains the server response. We pass it as is to the
// calling function.
// The only exception is that we must deal here with redirections.
// If the server redirects us, then we
// add 1 to redircnt
// close the logical connection
// try to connect to the new destination.
// login/auth to the new destination (this can generate other calls
// to this method if it has been invoked by DoLogin!)
// Reopen the file if the current fhandle value is not null (this
// can generate other calls to this method, not for the dologin
// phase)
// resend the command
//
// Also a READ/WRITE error requires a redirection
//
if ( (errorType == kREAD) ||
(errorType == kWRITE) ||
isRedir(&Xmsg->fHdr) )
{
// this procedure can decide if return to caller or
// continue with processing
ESrvErrorHandlerRetval Return = HandleServerError(errorType, Xmsg, req);
if (Return == kSEHRReturnMsgToCaller) {
// The caller is allowed to continue its processing
// with the current Xmsg
// Note that this can be a way to stop retrying
// e.g. if the resp in Xmsg is kxr_redirect, it means
// that the redir limit has been reached
if (HasToAlloc) {
free(*tmpMoreData);
*tmpMoreData = 0;
}
// Return the message to the client (SendGenCommand)
what_to_do = kTSRHReturnMex;
return Xmsg;
}
if (Return == kSEHRReturnNoMsgToCaller) {
// There was no Xmsg to return, or the previous one
// has no meaning anymore
// The caller will retry the cmd for some times,
// If we are connected the attempt will go OK,
// otherwise the next retry will fail, causing a
// redir to the lb or a rebounce.
if (HasToAlloc) {
free(*tmpMoreData);
*tmpMoreData = 0;
}
delete Xmsg;
Xmsg = 0;
what_to_do = kTSRHReturnMex;
return Xmsg;
}
}
what_to_do = kTSRHContinue;
return Xmsg;
}
//_____________________________________________________________________________
bool XrdClientConn::GetAccessToSrv()
{
// Gets access to the connected server. The login and authorization steps
// are performed here (calling method DoLogin() that performs logging-in
// and calls DoAuthentication() ).
// If the server redirects us, this is gently handled by the general
// functions devoted to the handling of the server's responses.
// Nothing is visible here, and nothing is visible from the other high
// level functions.
XrdClientLogConnection *logconn = ConnectionManager->GetConnection(fLogConnID);
// This is to prevent recursion in this delicate phase
if (fGettingAccessToSrv) {
logconn->GetPhyConnection()->StartReader();
return true;
}
fGettingAccessToSrv = true;
switch ((fServerType = DoHandShake(fLogConnID))) {
case kSTError:
Info(XrdClientDebug::kNODEBUG,
"GetAccessToSrv",
"HandShake failed with server [" <<
fUrl.Host << ":" << fUrl.Port << "]");
Disconnect(TRUE);
fGettingAccessToSrv = false;
return FALSE;
case kSTNone:
Info(XrdClientDebug::kNODEBUG,
"GetAccessToSrv", "The server on [" <<
fUrl.Host << ":" << fUrl.Port << "] is unknown");
Disconnect(TRUE);
fGettingAccessToSrv = false;
return FALSE;
case kSTRootd:
if (EnvGetLong(NAME_KEEPSOCKOPENIFNOTXRD) == 1) {
Info(XrdClientDebug::kHIDEBUG,
"GetAccessToSrv","Ok: the server on [" <<
fUrl.Host << ":" << fUrl.Port <<
"] is a rootd. Saving socket for later use.");
// Get socket descriptor
fOpenSockFD = logconn->GetPhyConnection()->SaveSocket();
Disconnect(TRUE);
ConnectionManager->GarbageCollect();
break;
} else {
Info(XrdClientDebug::kHIDEBUG,
"GetAccessToSrv","Ok: the server on [" <<
fUrl.Host << ":" << fUrl.Port << "] is a rootd."
" Not supported.");
Disconnect(TRUE);
fGettingAccessToSrv = false;
return FALSE;
}
case kSTBaseXrootd:
Info(XrdClientDebug::kHIDEBUG,
"GetAccessToSrv",
"Ok: the server on [" <<
fUrl.Host << ":" << fUrl.Port << "] is an xrootd redirector.");
logconn->GetPhyConnection()->SetTTL(EnvGetLong(NAME_LBSERVERCONN_TTL));
break;
case kSTMetaXrootd:
Info(XrdClientDebug::kHIDEBUG,
"GetAccessToSrv",
"Ok: the server on [" <<
fUrl.Host << ":" << fUrl.Port << "] is an xrootd meta manager.");
logconn->GetPhyConnection()->SetTTL(EnvGetLong(NAME_LBSERVERCONN_TTL));
break;
case kSTDataXrootd:
Info( XrdClientDebug::kHIDEBUG,
"GetAccessToSrv",
"Ok, the server on [" <<
fUrl.Host << ":" << fUrl.Port << "] is an xrootd data server.");
logconn->GetPhyConnection()->SetTTL(EnvGetLong(NAME_DATASERVERCONN_TTL));
break;
}
bool retval = false;
XrdClientPhyConnection *phyc = logconn->GetPhyConnection();
if (!phyc) {
fGettingAccessToSrv = false;
return false;
}
XrdClientPhyConnLocker pl(phyc);
// Execute a login if connected to a xrootd server
if (fServerType != kSTRootd) {
phyc = logconn->GetPhyConnection();
if (!phyc || !phyc->IsValid()) {
Error( "GetAccessToSrv", "Physical connection disappeared.");
fGettingAccessToSrv = false;
return false;
}
// Start the reader thread in the phyconn, if needed
phyc->StartReader();
if (phyc->IsLogged() == kNo)
retval = DoLogin();
else {
Info( XrdClientDebug::kHIDEBUG,
"GetAccessToSrv", "Reusing physical connection to server [" <<
fUrl.Host << ":" << fUrl.Port << "]).");
retval = true;
}
}
else
retval = true;
fGettingAccessToSrv = false;
return retval;
}
//_____________________________________________________________________________
ERemoteServerType XrdClientConn::DoHandShake(short int log) {
struct ServerInitHandShake xbody;
ERemoteServerType type;
// Get the physical connection
XrdClientLogConnection *lcn = ConnectionManager->GetConnection(log);
if (!lcn) return kSTError;
XrdClientPhyConnection *phyconn = lcn->GetPhyConnection();
if (!phyconn || !phyconn->IsValid()) return kSTError;
{
XrdClientPhyConnLocker pl(phyconn);
if( phyconn->fServerType != kSTNone )
type = phyconn->fServerType;
else
type = phyconn->DoHandShake( xbody );
if (type == kSTError) return type;
// Check if the server is the eXtended rootd or not, checking the value
// of type
fServerProto = phyconn->fServerProto;
//------------------------------------------------------------------------
// Handle a redirector - we always remember the first manager (redirector)
// encountered
//------------------------------------------------------------------------
if( type == kSTBaseXrootd )
{
if( !fLBSUrl || fLBSIsMeta )
{
delete fLBSUrl;
fLBSIsMeta = false;
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"Setting Load Balancer Server Url = " << fUrl.GetUrl() );
fLBSUrl = new XrdClientUrlInfo( fUrl.GetUrl() );
}
}
//------------------------------------------------------------------------
// Handle a meta manager - we always remember the last meta manager
// encountered
//------------------------------------------------------------------------
if( type == kSTMetaXrootd )
{
delete fMetaUrl;
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"Setting Meta Manager Server Url = " << fUrl.GetUrl() );
fMetaUrl = new XrdClientUrlInfo( fUrl.GetUrl() );
//----------------------------------------------------------------------
// We always remember the first manager in the chain, unless there is
// none available in which case we use the last meta manager for this
// purpose
//----------------------------------------------------------------------
if( !fLBSUrl || fLBSIsMeta )
{
delete fLBSUrl;
fLBSIsMeta = true;
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"Setting Meta Load Balancer Server Url = " << fUrl.GetUrl() );
fLBSUrl = new XrdClientUrlInfo( fUrl.GetUrl() );
}
}
return type;
}
}
//_____________________________________________________________________________
bool XrdClientConn::DoLogin()
{
// This method perform the loggin-in into the server just after the
// hand-shake. It also calls the DoAuthentication() method
ClientRequest reqhdr;
bool resp;
// We fill the header struct containing the request for login
memset( &reqhdr, 0, sizeof(reqhdr));
SetSID(reqhdr.header.streamid);
reqhdr.header.requestid = kXR_login;
reqhdr.login.capver[0] = XRD_CLIENT_CAPVER;
reqhdr.login.pid = getpid();
// Get username from Url
XrdOucString User = fUrl.User;
if (User.length() <= 0) {
// Use local username, if not specified
char name[256];
#ifndef WIN32
if (!XrdOucUtils::UserName(geteuid(), name, sizeof(name))) User = name;
#else
DWORD length = sizeof (name);
GetUserName(name, &length);
User = name;
#endif
}
if (User.length() > 0)
{
strncpy( (char *)reqhdr.login.username, User.c_str(), 7 );
reqhdr.login.username[7] = 0;
}
else
strcpy( (char *)reqhdr.login.username, "????" );
// If we run with root as effective user we need to temporary change
// effective ID to User
XrdOucString effUser = User;
#ifndef WIN32
if (!getuid()) {
if (getenv("XrdClientEUSER")) effUser = getenv("XrdClientEUSER");
}
XrdSysPrivGuard guard(effUser.c_str());
if (!guard.Valid() && !getuid()) {
// Set error, in case of need
fOpenError = kXR_NotAuthorized;
LastServerError.errnum = fOpenError;
XrdOucString emsg("Cannot set effective uid for user: ");
emsg += effUser;
strcpy(LastServerError.errmsg, emsg.c_str());
Error("DoLogin", emsg << ". Exiting.");
return false;
}
#endif
// set the token with the value provided by a previous
// redirection (if any)
reqhdr.header.dlen = fRedirInternalToken.length();
// We call SendGenCommand, the function devoted to sending commands.
Info(XrdClientDebug::kHIDEBUG,
"DoLogin",
"Logging into the server [" << fUrl.Host << ":" << fUrl.Port <<
"]. pid=" << reqhdr.login.pid << " uid=" << reqhdr.login.username);
{
XrdClientLogConnection *l = ConnectionManager->GetConnection(fLogConnID);
XrdClientPhyConnection *p = 0;
if (l) p = l->GetPhyConnection();
if (p) {p->SetLogged(kNo); fOpenSockFD = p->GetSocket();}
else {
Error("DoLogin",
"Logical connection disappeared before request?!? Srv: [" << fUrl.Host << ":" << fUrl.Port <<
"]. Exiting.");
return false;
}
}
char *plist = 0;
resp = SendGenCommand(&reqhdr, fRedirInternalToken.c_str(),
(void **)&plist, 0,
TRUE, (char *)"XrdClientConn::DoLogin");
// plist is the plain response from the server. We need a way to 0-term it.
XrdSecProtocol *secp = 0;
SessionIDInfo prevSessionID, *prevsessidP = 0;
XrdOucString sessname;
XrdOucString sessdump;
if (resp && LastServerResp.dlen && plist) {
plist = (char *)realloc(plist, LastServerResp.dlen+1);
// Terminate server reply
plist[LastServerResp.dlen]=0;
char *pauth = 0;
int lenauth = 0;
if ((fServerProto >= 0x240) && (LastServerResp.dlen >= 16)) {
if (XrdClientDebug::kHIDEBUG <= DebugLevel()) {
char b[20];
for (unsigned int i = 0; i < 16; i++) {
snprintf(b, 20, "%.2x", plist[i]);
sessdump += b;
}
Info(XrdClientDebug::kHIDEBUG,
"DoLogin","Got session ID: " << sessdump);
}
// Get the previous session id, in order to kill it later
char buf[20];
snprintf(buf, 20, ":%d.", fUrl.Port);
sessname+= fUrl.HostAddr;
if (sessname.length() <= 0)
sessname = fUrl.Host;
sessname += buf;
sessname += fUrl.User;
memcpy(&mySessionID, plist, sizeof(mySessionID));
fSessionIDRMutex.Lock();
prevsessidP = fSessionIDRepo.Find(sessname.c_str());
if (prevsessidP)
{prevSessionID = *prevsessidP;
memcpy(prevsessidP, &mySessionID, sizeof(mySessionID));
}
fSessionIDRMutex.UnLock();
// Check if we need to authenticate
if (LastServerResp.dlen > 16) {
Info(XrdClientDebug::kHIDEBUG, "DoLogin","server requires authentication");
pauth = plist+16;
lenauth = LastServerResp.dlen-15;
}
} else {
// We need to authenticate
Info(XrdClientDebug::kHIDEBUG, "DoLogin","server requires authentication");
pauth = plist;
lenauth = LastServerResp.dlen+1;
}
// Run authentication, if needed
if (pauth) {
char *cenv = 0;
//
// Set trace level
if (EnvGetLong(NAME_DEBUG) > 0) {
cenv = new char[18];
sprintf(cenv, "XrdSecDEBUG=%ld",EnvGetLong(NAME_DEBUG));
putenv(cenv);
}
//
// Set username
cenv = new char[User.length()+12];
sprintf(cenv, "XrdSecUSER=%s",User.c_str());
putenv(cenv);
//
// Set remote hostname
cenv = new char[fUrl.Host.length()+12];
sprintf(cenv, "XrdSecHOST=%s",fUrl.Host.c_str());
putenv(cenv);
secp = DoAuthentication(pauth, lenauth);
resp = (secp != 0) ? 1 : 0;
}
if( resp ) {
if (prevsessidP) {
//
// We have to kill the previous session, if any
// By sending a kXR_endsess
if (XrdClientDebug::kHIDEBUG <= DebugLevel()) {
XrdOucString sessdump;
char b[20];
for (unsigned int i = 0; i < sizeof(prevsessidP->id); i++) {
snprintf(b, 20, "%.2x", prevsessidP->id[i]);
sessdump += b;
}
Info(XrdClientDebug::kHIDEBUG,
"DoLogin","Found prev session info for " << sessname <<
": " << sessdump);
}
memset( &reqhdr, 0, sizeof(reqhdr));
SetSID(reqhdr.header.streamid);
reqhdr.header.requestid = kXR_endsess;
memcpy(reqhdr.endsess.sessid, &prevSessionID, sizeof(prevSessionID));
// terminate session
Info(XrdClientDebug::kHIDEBUG,
"DoLogin","Trying to terminate previous session.");
SendGenCommand(&reqhdr, 0, 0, 0,
FALSE, (char *)"XrdClientConn::Endsess");
} else {
Info(XrdClientDebug::kHIDEBUG,
"DoLogin","No prev session info for " << sessname);
// No session info? Let's create one.
SessionIDInfo *newsessid = new SessionIDInfo;
*newsessid = mySessionID;
fSessionIDRMutex.Lock();
fSessionIDRepo.Rep(sessname.c_str(), newsessid);
fSessionIDRMutex.UnLock();
}
} } //resp
// Flag success if everything went ok
{
XrdClientLogConnection *l = ConnectionManager->GetConnection(fLogConnID);
XrdClientPhyConnection *p = 0;
if (l) p = l->GetPhyConnection();
if (!p) {
Error("DoLogin",
"Logical connection disappeared after request?!? Srv: [" << fUrl.Host << ":" << fUrl.Port <<
"]. Exiting.");
return false;
}
if (resp) {
p->SetLogged(kYes);
p->SetSecProtocol(secp);
}
else Disconnect(true);
}
if (plist)
free(plist);
return resp;
}
//_____________________________________________________________________________
XrdSecProtocol *XrdClientConn::DoAuthentication(char *plist, int plsiz)
{
// Negotiate authentication with the remote server. Tries in turn
// all available protocols proposed by the server (in plist),
// starting from the first.
static XrdSecGetProt_t getp = 0;
XrdSecProtocol *protocol = (XrdSecProtocol *)0;
XrdOucEnv authEnv;
struct sockaddr myAddr;
socklen_t myALen = sizeof(myAddr);
if (!plist || plsiz <= 0)
return protocol;
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"host " << fUrl.Host << " sent a list of " << plsiz << " bytes");
//
// Prepare host/IP information of the remote xrootd. This is required
// for the authentication.
XrdNetAddr theAddr;
const char *hosterrmsg = 0;
if ((hosterrmsg = theAddr.Set(fOpenSockFD)))
{Info(XrdClientDebug::kUSERDEBUG, "DoAuthentication",
"getHostAddr said '" << *hosterrmsg << "'");
return protocol;
}
// Establish our local connection details (used by sss protocol)
//
if (!getsockname(fOpenSockFD, &myAddr, &myALen))
{char ipBuff[64];
if (XrdSysDNS::IPFormat(&myAddr, ipBuff, sizeof(ipBuff)))
authEnv.Put("sockname", ipBuff);
}
//
// Variables for negotiation
XrdSecParameters *secToken = 0;
XrdSecCredentials *credentials = 0;
//
// Prepare the parms object
char *bpar = (char *)malloc(plsiz + 1);
if (bpar) memcpy(bpar, plist, plsiz);
bpar[plsiz] = 0;
XrdSecParameters Parms(bpar, plsiz + 1);
// We need to load the protocol getter the first time we are here
if (!getp) {
LastServerError.errmsg[0] = 0;
if (!(getp = XrdSecLoadSecFactory(LastServerError.errmsg,
sizeof(LastServerError.errmsg))))
{Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"unable to load XrdSecGetProtocol()");
// Set error, in case of need
fOpenError = kXR_NotAuthorized;
LastServerError.errnum = fOpenError;
if (!LastServerError.errmsg[0])
strcpy(LastServerError.errmsg, "Unable to load XrdSecGetProtocol()");
return protocol;
}
}
//
// Get a instance of XrdSecProtocol; the order of preference is the one
// specified by the server; the env XRDSECPROTOCOL can be used to force
// the choice.
while ((protocol = (*getp)((char *)fUrl.Host.c_str(), theAddr, Parms, 0)))
//
// Protocol name
{XrdOucString protname = protocol->Entity.prot;
//
// Once we have the protocol, get the credentials
XrdOucErrInfo ei("", &authEnv);
credentials = protocol->getCredentials(0, &ei);
if (!credentials) {
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"cannot obtain credentials (protocol: "<<
protname<<")");
// Set error, in case of need
fOpenError = kXR_NotAuthorized;
LastServerError.errnum = fOpenError;
strcpy(LastServerError.errmsg, "cannot obtain credentials for protocol: ");
strcat(LastServerError.errmsg, ei.getErrText());
protocol->Delete();
protocol = 0;
continue;
} else {
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"credentials size: "<< credentials->size);
}
//
// We fill the header struct containing the request for login
ClientRequest reqhdr;
memset(reqhdr.auth.reserved, 0, 12);
memset(reqhdr.auth.credtype, 0, 4 );
memcpy(reqhdr.auth.credtype, protname.c_str(), protname.length() > 4 ? 4 : protname.length() );
LastServerResp.status = kXR_authmore;
char *srvans = 0;
while (LastServerResp.status == kXR_authmore) {
bool resp = false;
//
// Length of the credentials buffer
reqhdr.header.dlen = credentials->size;
SetSID(reqhdr.header.streamid);
reqhdr.header.requestid = kXR_auth;
resp = SendGenCommand(&reqhdr, credentials->buffer, (void **)&srvans, 0, TRUE,
(char *)"XrdClientConn::DoAuthentication");
SafeDelete(credentials);
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"server reply: status: "<<
LastServerResp.status <<
" dlen: "<< LastServerResp.dlen);
if (resp && (LastServerResp.status == kXR_authmore)) {
//
// We are required to send additional information
// First assign the security token that we have received
// at the login request
secToken = new XrdSecParameters(srvans,LastServerResp.dlen);
//
// then get next part of the credentials
credentials = protocol->getCredentials(secToken, &ei);
SafeDelete(secToken); // nb: srvans is released here
srvans = 0;
if (!credentials) {
Info(XrdClientDebug::kUSERDEBUG, "DoAuthentication",
"cannot obtain credentials");
// Set error, in case of need
fOpenError = kXR_NotAuthorized;
LastServerError.errnum = fOpenError;
strcpy(LastServerError.errmsg, "cannot obtain credentials: ");
strcat(LastServerError.errmsg, ei.getErrText());
protocol->Delete();
protocol = 0;
break;
} else {
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"credentials size " << credentials->size);
}
} else {
// Something happened, it could be an error or a good thing as well
if (LastServerResp.status == kXR_error) {
// Unexpected reply: stop handshake and print error msg, if any
if (LastServerError.errmsg[0])
Error("DoAuthentication", LastServerError.errmsg);
protocol->Delete();
protocol = 0;
// This is a fatal auth error
break;
}
if (!resp) {
// Communication error
protocol->Delete();
protocol = 0;
// This is a fatal auth error
break;
}
}
}
// If we are done
if (protocol) break;
}
if (!protocol) {
Info(XrdClientDebug::kHIDEBUG, "DoAuthentication",
"unable to get protocol object.");
// Set error, in case of need
fOpenError = kXR_NotAuthorized;
LastServerError.errnum = fOpenError;
strcat(LastServerError.errmsg, ": unable to get protocol object.");
}
// Return the result of the negotiation
//
return protocol;
}
//_____________________________________________________________________________
XrdClientConn::ESrvErrorHandlerRetval
XrdClientConn::HandleServerError(XReqErrorType &errorType, XrdClientMessage *xmsg,
ClientRequest *req)
{
// Handle errors from server
int newport;
XrdOucString newhost;
bool noRedirError = (fMaxGlobalRedirCnt == 1 && xmsg && isRedir(&xmsg->fHdr));
// Close the log connection at this point the fLogConnID is no longer valid.
// On read/write error the physical channel may be not OK, so it's a good
// idea to shutdown it.
// If there are other logical conns pointing to it, they will get an error,
// which will be handled
if (!noRedirError) {
if ((errorType == kREAD) || (errorType == kWRITE)) {
Disconnect(TRUE);
if (fMainReadCache)
fMainReadCache->RemovePlaceholders();
} else
Disconnect(FALSE);
}
// We cycle repeatedly trying to ask the dlb for a working redir destination
do {
// Anyway, let's update the counter, we have just been redirected
fGlobalRedirCnt++;
Info(XrdClientDebug::kHIDEBUG,
"HandleServerError",
"Redir count=" << fGlobalRedirCnt);
if ( fGlobalRedirCnt >= fMaxGlobalRedirCnt ) {
if (noRedirError) {
// The caller just wants the redir info:
// extract it (new host:port) from the response and return
newhost = "";
newport = 0;
// An explicit redir overwrites token and opaque info
ParseRedir(xmsg, newport, newhost, fRedirOpaque, fRedirInternalToken);
fRedirCGI = fRedirOpaque;
// Save it in fREQUrl
// fREQUrl = fUrl;
// fREQUrl.Host = newhost;
// fREQUrl.Port = newport;
// Reset counter
fGlobalRedirCnt = 0;
return kSEHRReturnMsgToCaller;
} else {
return kSEHRContinue;
}
}
// If the time limit has expired... exit
if (IsOpTimeLimitElapsed(time(0))) return kSEHRContinue;
newhost = "";
newport = 0;
if ((errorType == kREAD) ||
(errorType == kWRITE) ||
(errorType == kREDIRCONNECT)) {
bool cangoaway = ( fRedirHandler &&
fRedirHandler->CanRedirOnError() );
// We got some errors in the communication phase
// the physical connection has been closed;
// then we must go back to the load balancer
// if there is any
// The exception here is that if the file was open in
// write mode, we must rebounce and not go away to other hosts
// To state this, we just asked to our redir handler
if ( (fREQUrl.Host.length() > 0) ) {
// If this client was explicitly told to redirect somewhere...
//ClearSessyionID();
// Note, this could contain opaque information
newhost = fREQUrl.Host;
newport = fREQUrl.Port;
ParseRedirHost(newhost, fRedirOpaque, fRedirInternalToken);
fRedirCGI = fRedirOpaque;
// An unsuccessful connection to the dest host will make the
// client go to the LB
fREQUrl.Clear();
}
else
if ( cangoaway && fLBSUrl && (fLBSUrl->GetUrl().length() > 0) ) {
// "Normal" error... we go to the LB if any
// Clear the current session info. Rather simplicistic.
//ClearSessionID();
newhost = fLBSUrl->Host;
newport = fLBSUrl->Port;
}
else {
Error("HandleServerError",
"Communication error"
" with server [" << fUrl.Host << ":" << fUrl.Port <<
"]. Rebouncing here.");
if (fUrl.Host.length()) newhost = fUrl.Host;
else
newhost = fUrl.HostAddr;
fRedirOpaque = fRedirCGI;
newport = fUrl.Port;
}
} else if (isRedir(&xmsg->fHdr)) {
// Extract the info (new host:port) from the response
newhost = "";
newport = 0;
// An explicit redir overwrites token and opaque info
ParseRedir(xmsg, newport, newhost, fRedirOpaque, fRedirInternalToken);
fRedirCGI = fRedirOpaque;
// Clear the current session info. Rather simplicistic.
//ClearSessionID();
}
// Now we should have the parameters needed for the redir
CheckPort(newport);
if ((newhost.length() > 0) && newport) {
XrdClientUrlInfo NewUrl(fUrl.GetUrl());
if (DebugLevel() >= XrdClientDebug::kUSERDEBUG)
Info(XrdClientDebug::kUSERDEBUG,
"HandleServerError",
"Received redirection to [" << newhost << ":" << newport <<
"]. Token=[" << fRedirInternalToken << "]" <<
"]. Opaque=[" << fRedirOpaque << "].");
errorType = kOK;
NewUrl.Host = NewUrl.HostAddr = newhost;
NewUrl.Port = newport;
NewUrl.SetAddrFromHost();
if ( !CheckHostDomain(newhost) ) {
Error("HandleServerError",
"Redirection to a server out-of-domain disallowed. Abort.");
abort();
}
errorType = GoToAnotherServer(NewUrl);
}
else {
// Host or port are not valid or empty
Error("HandleServerError",
"Received redirection to [" << newhost << ":" << newport <<
"]. Token=[" << fRedirInternalToken << "]" <<
"]. Opaque=[" << fRedirOpaque << "]. No server to go...");
errorType = kREDIRCONNECT;
}
// We don't want to flood servers...
if (errorType == kREDIRCONNECT) {
if (LastServerError.errnum == kXR_NotAuthorized)
return kSEHRReturnMsgToCaller;
sleep(EnvGetLong(NAME_RECONNECTWAIT));
}
// We keep trying the connection to the same host (we have only one)
// until we are connected, or the max count for
// redirections is reached
// The attempts must be stopped if we are not authorized
} while ((errorType == kREDIRCONNECT) && (LastServerError.errnum != kXR_NotAuthorized));
// We are here if correctly connected and handshaked and logged
if (!IsConnected()) {
Error("HandleServerError",
"Not connected. Internal error. Abort.");
abort();
}
// If the former request was a kxr_open,
// there is no need to reissue it, since it will be the next attempt
// to rerun the cmd.
// We simply return to the caller, which will retry
// The same applies to kxr_login. No need to reopen a file if we are
// just logging into another server.
// The open request will surely follow if needed.
if ((req->header.requestid == kXR_open) ||
(req->header.requestid == kXR_login)) return kSEHRReturnNoMsgToCaller;
// Here we are. If we had a filehandle then we must
// request a new one.
char localfhandle[4];
bool wasopen, newopenok;
if (fRedirHandler) {
newopenok = fRedirHandler->OpenFileWhenRedirected(localfhandle, wasopen);
if (newopenok && wasopen) {
// We are here if the file has been opened succesfully
// or if it was not open
// Tricky thing: now we have a new filehandle, perhaps in
// a different server. Then we must correct the filehandle in
// the msg we were sending and that we must repeat...
PutFilehandleInRequest(req, localfhandle);
// Everything should be ok here.
// If we have been redirected,then we are connected, logged and reopened
// the file. If we had a r/w error (xmsg==0 or xmsg->IsError) we are
// OK too. Since we may come from a comm error, then xmsg can be null.
if (xmsg && !xmsg->IsError())
return kSEHRContinue; // the case of explicit redir
else
return kSEHRReturnNoMsgToCaller; // the case of recovered error
}
if (!newopenok) return kSEHRContinue; // the case of explicit redir
}
// We are here if we had no fRedirHandler or the reopen failed
// If we have no fRedirHandler then treat it like an OK
if (!fRedirHandler) {
// Since we may come from a comm error, then xmsg can be null.
//if (xmsg) xmsg->SetHeaderStatus( kXR_ok );
if (xmsg && !xmsg->IsError())
return kSEHRContinue; // the case of explicit redir
else
return kSEHRReturnNoMsgToCaller; // the case of recovered error
}
// We are here if we have been unable to connect somewhere to handle the
// troubled situation
return kSEHRContinue;
}
//_____________________________________________________________________________
XReqErrorType XrdClientConn::GoToAnotherServer(XrdClientUrlInfo &newdest)
{
// Re-directs to another server
if( EnvGetLong( NAME_PRINT_REDIRECTS ) )
Info( XrdClientDebug::kNODEBUG, "GoToAnotherServer",
"Going to: " << newdest.Host << ":" << newdest.Port );
fGettingAccessToSrv = false;
if (!newdest.Port) newdest.Port = 1094;
if (newdest.HostAddr == "") newdest.HostAddr = newdest.Host;
if ( (fLogConnID = Connect( newdest, fUnsolMsgHandler)) == -1) {
// Note: if Connect is unable to work then we are in trouble.
// It seems that we have been redirected to a non working server
Error("GoToAnotherServer", "Error connecting to [" <<
newdest.Host << ":" << newdest.Port);
// If no conn is possible then we return to the load balancer
return kREDIRCONNECT;
}
//
// Set fUrl to the new data/lb server if the
// connection has been succesfull
//
fUrl = newdest;
if (IsConnected() && !GetAccessToSrv()) {
Error("GoToAnotherServer", "Error handshaking to [" <<
newdest.Host.c_str() << ":" << newdest.Port << "]");
return kREDIRCONNECT;
}
fPrimaryStreamid = ConnectionManager->GetConnection(fLogConnID)->Streamid();
return kOK;
}
//_____________________________________________________________________________
XReqErrorType XrdClientConn::GoToMetaManager()
{
if( !fMetaUrl )
return kGENERICERR;
//----------------------------------------------------------------------------
// We go back to the meta manager so we need to forget the manager that
// we have encountered
//----------------------------------------------------------------------------
delete fLBSUrl;
fLBSUrl = new XrdClientUrlInfo( fMetaUrl->GetUrl() );
fLBSIsMeta = true;
return GoToAnotherServer( *fMetaUrl );
}
//_____________________________________________________________________________
XReqErrorType XrdClientConn::GoBackToRedirector() {
// This is a primitive used to force a client to consider again
// the root node as the default connection, even after requests that involve
// redirections. Used typically for stat and similar functions
Disconnect(false);
if (fGlobalRedirCnt) fGlobalRedirCnt--;
return (fLBSUrl ? GoToAnotherServer(*fLBSUrl) : kOK);
}
//_____________________________________________________________________________
XrdOucString XrdClientConn::GetDomainToMatch(XrdOucString hostname) {
// Return net-domain of host hostname in 's'.
// If the host is unknown in the DNS world but it's a
// valid inet address, then that address is returned, in order
// to be matched later for access granting
char *fullname, *err;
// The name may be already a FQDN: try extracting the domain
XrdOucString res = ParseDomainFromHostname(hostname);
if (res.length() > 0)
return res;
// Let's look up the hostname
// It may also be a w.x.y.z type address.
err =
fullname = XrdSysDNS::getHostName((char *)hostname.c_str(), &err);
if ( strcmp(fullname, (char *)"0.0.0.0") ) {
// The looked up name seems valid
// The hostname domain can still be unknown
Info(XrdClientDebug::kHIDEBUG,
"GetDomainToMatch", "GetHostName(" << hostname <<
") returned name=" << fullname);
res = ParseDomainFromHostname(fullname);
if (res == "") {
Info(XrdClientDebug::kHIDEBUG,
"GetDomainToMatch", "No domain contained in " << fullname);
res = ParseDomainFromHostname(hostname);
}
if (res == "") {
Info(XrdClientDebug::kHIDEBUG,
"GetDomainToMatch", "No domain contained in " << hostname);
res = hostname;
}
} else {
Info(XrdClientDebug::kHIDEBUG,
"GetDomainToMatch", "GetHostName(" << hostname << ") returned a non valid address. errtxt=" << err);
res = ParseDomainFromHostname(hostname);
}
Info(XrdClientDebug::kHIDEBUG,
"GetDomainToMatch", "GetDomain(" << hostname << ") --> " << res);
if (fullname) free(fullname);
return res;
}
//_____________________________________________________________________________
XrdOucString XrdClientConn::ParseDomainFromHostname(XrdOucString hostname)
{
// Extract the domain
XrdOucString res;
int idot = hostname.find('.');
if (idot != STR_NPOS)
res.assign(hostname, idot+1);
// Done
return res;
}
//_____________________________________________________________________________
void XrdClientConn::CheckPort(int &port) {
if(port <= 0) {
Info(XrdClientDebug::kHIDEBUG,
"checkPort",
"TCP port not specified. Trying to get it from /etc/services...");
struct servent *S = getservbyname("rootd", "tcp");
if(!S) {
Info(XrdClientDebug::kHIDEBUG,
"checkPort", "Service rootd not specified in /etc/services. "
"Using default IANA tcp port 1094");
port = 1094;
} else {
Info(XrdClientDebug::kNODEBUG,
"checkPort", "Found tcp port " << ntohs(S->s_port) <<
" in /etc/service");
port = (int)ntohs(S->s_port);
}
}
}
//___________________________________________________________________________
long XrdClientConn::GetDataFromCache(const void *buffer, long long begin_offs,
long long end_offs, bool PerfCalc,
XrdClientIntvList &missingblks, long &outstandingblks) {
// Copies the requested data from the cache. Returns the number of bytes got
// Perfcalc = kFALSE forces the call not to impact the perf counters
if (!fMainReadCache)
return FALSE;
return ( fMainReadCache->GetDataIfPresent(buffer,
begin_offs,
end_offs,
PerfCalc,
missingblks, outstandingblks) );
}
//___________________________________________________________________________
bool XrdClientConn::SubmitDataToCache(XrdClientMessage *xmsg, long long begin_offs,
long long end_offs) {
// Inserts the data part of this message into the cache
if (xmsg && fMainReadCache &&
((xmsg->HeaderStatus() == kXR_oksofar) ||
(xmsg->HeaderStatus() == kXR_ok)))
fMainReadCache->SubmitXMessage(xmsg, begin_offs, end_offs);
return true;
}
//___________________________________________________________________________
bool XrdClientConn::SubmitRawDataToCache(const void *buffer,
long long begin_offs,
long long end_offs) {
if (fMainReadCache) {
if (!fMainReadCache->SubmitRawData(buffer, begin_offs, end_offs))
free(const_cast(buffer));
}
return true;
}
//___________________________________________________________________________
XReqErrorType XrdClientConn::WriteToServer_Async(ClientRequest *req,
const void* reqMoreData,
int substreamid) {
// We allocate a new child streamid, linked to this req
// Note that the content of the req will be copied. This allows us
// to send N times the same req without destroying it
// if an answer comes before we finish
// req is automatically updated with the new streamid
if (!ConnectionManager->SidManager()->GetNewSid(fPrimaryStreamid, req))
return kNOMORESTREAMS;
// If this is a write request, its buffer has to be inserted into the cache
// This will be used for reference if the request has to be retried later
// or to give coherency to the read/write semantic
// From this point on, we consider the request as outstanding
// Note that his kind of blocks has to be pinned inside the cache until the write is successful
if (fMainReadCache && (req->header.requestid == kXR_write)) {
// We have to dup the mem blk
// It will be destroyed when purged by the cache, only after it has been
// acknowledged and pinned
void *locbuf = malloc(req->header.dlen);
if (!locbuf) {
Error("WriteToServer_Async", "Error allocating " <<
req->header.dlen << " bytes.");
return kGENERICERR;
}
memcpy(locbuf, reqMoreData, req->header.dlen);
if (!fMainReadCache->SubmitRawData(locbuf, req->write.offset, req->write.offset+req->header.dlen-1, true))
free(locbuf);
}
// Send the req to the server
return WriteToServer(req, reqMoreData, fLogConnID, substreamid);
}
//_____________________________________________________________________________
bool XrdClientConn::PanicClose() {
ClientRequest closeFileRequest;
memset(&closeFileRequest, 0, sizeof(closeFileRequest) );
SetSID(closeFileRequest.header.streamid);
closeFileRequest.close.requestid = kXR_close;
//memcpy(closeFileRequest.close.fhandle, fHandle, sizeof(fHandle) );
closeFileRequest.close.dlen = 0;
WriteToServer(&closeFileRequest, 0, fLogConnID);
return TRUE;
}
void XrdClientConn::CheckREQPauseState() {
// This client might have been paused. In this case the calling thread
// is put to sleep into a condvar until the desired time arrives.
// The caller can be awakened by signalling the condvar. But, if the
// requested wake up time did not elapse, the caller has to sleep again.
time_t timenow;
// Lock mutex
fREQWait->Lock();
// Check condition
while (1) {
timenow = time(0);
if ((timenow < fREQWaitTimeLimit) && !IsOpTimeLimitElapsed(timenow)) {
// If still to wait... wait in relatively small steps
time_t tt = xrdmin(fREQWaitTimeLimit - timenow, 10);
fREQWait->Wait(tt);
}
else break;
}
// Unlock mutex
fREQWait->UnLock();
}
void XrdClientConn::CheckREQConnectWaitState() {
// This client might have been paused. In this case the calling thread
// is put to sleep into a condvar until the desired time arrives.
// The caller can be awakened by signalling the condvar. But, if the
// requested wake up time did not elapse, the caller has to sleep again.
time_t timenow;
// Lock mutex
fREQConnectWait->Lock();
// Check condition
while (1) {
timenow = time(0);
if ((timenow < fREQConnectWaitTimeLimit) && !IsOpTimeLimitElapsed(timenow)) {
// If still to wait... wait in relatively small steps
time_t tt = xrdmin(fREQWaitTimeLimit - timenow, 10);
// If still to wait... wait
fREQConnectWait->Wait(tt);
}
else break;
}
// Unlock mutex
fREQConnectWait->UnLock();
}
bool XrdClientConn::WaitResp(int secsmax) {
// This client might have been paused to wait for a delayed response.
// In this case the calling thread
// is put to sleep into a condvar until the timeout or the response arrives.
// Returns true on timeout, false if a signal was caught
int rc = false;
Info(XrdClientDebug::kHIDEBUG,
"WaitResp", "Waiting response for " << secsmax << " secs." );
// Lock condvar
fREQWaitResp->Lock();
time_t timelimit = time(0)+secsmax;
while (!fREQWaitRespData) {
rc = true;
time_t timenow = time(0);
if ((timenow < timelimit) && !IsOpTimeLimitElapsed(timenow)) {
// If still to wait... wait in relatively small steps
time_t tt = xrdmin(timelimit - timenow, 10);
fREQWaitResp->Wait(tt);
// Let's see if there's something
// If not.. continue waiting
if (fREQWaitRespData) {
rc = false;
break;
}
}
else break;
}
// Unlock condvar
fREQWaitResp->UnLock();
if (rc) {
Info(XrdClientDebug::kHIDEBUG,
"WaitResp", "Timeout elapsed.");
}
else {
Info(XrdClientDebug::kHIDEBUG,
"WaitResp", "Got an unsolicited response. Data=" << fREQWaitRespData);
}
return rc;
}
UnsolRespProcResult XrdClientConn::ProcessAsynResp(XrdClientMessage *unsolmsg) {
// A client on the current physical conn might be in a "wait for response" state
// Here we process a potential response
// If this is a comm error, let's awake the sleeping thread and continue
if (unsolmsg->GetStatusCode() != XrdClientMessage::kXrdMSC_ok) {
fREQWaitResp->Lock();
// We also have to fake a regular answer. kxr_wait is ok!
fREQWaitRespData = (ServerResponseBody_Attn_asynresp *)malloc( sizeof(struct ServerResponseBody_Attn_asynresp) );
memset( fREQWaitRespData, 0, sizeof(struct ServerResponseBody_Attn_asynresp) );
fREQWaitRespData->resphdr.status = kXR_wait;
fREQWaitRespData->resphdr.dlen = sizeof(kXR_int32);
kXR_int32 i = htonl(1);
memcpy(&fREQWaitRespData->respdata, &i, sizeof(i));
fREQWaitResp->Signal();
fREQWaitResp->UnLock();
return kUNSOL_CONTINUE;
}
ServerResponseBody_Attn_asynresp *ar;
ar = (ServerResponseBody_Attn_asynresp *)unsolmsg->GetData();
// If the msg streamid matched ours then continue
if ( !MatchStreamid(&ar->resphdr) ) return kUNSOL_CONTINUE;
Info(XrdClientDebug::kHIDEBUG,
"ProcessAsynResp", "Streamid matched." );
fREQWaitResp->Lock();
// Strip the data from the message and save it. It's the response we are waiting for.
// Note that it will contain also the data!
fREQWaitRespData = ar;
clientUnmarshall(&fREQWaitRespData->resphdr);
if (DebugLevel() >= XrdClientDebug::kDUMPDEBUG)
smartPrintServerHeader(&fREQWaitRespData->resphdr);
// After all, this is the last resp we received
memcpy(&LastServerResp, &fREQWaitRespData->resphdr, sizeof(struct ServerResponseHeader));
switch (fREQWaitRespData->resphdr.status) {
case kXR_error: {
// The server declared an error.
// We want to save its content
struct ServerResponseBody_Error *body_err;
body_err = (struct ServerResponseBody_Error *)(&fREQWaitRespData->respdata);
if (body_err) {
// Print out the error information, as received by the server
kXR_int32 fErr = (XErrorCode)ntohl(body_err->errnum);
Info(XrdClientDebug::kNODEBUG, "ProcessAsynResp", "Server declared: " <<
(const char*)body_err->errmsg << "(error code: " << fErr << ")");
// Save the last error received
memset(&LastServerError, 0, sizeof(LastServerError));
memcpy(&LastServerError, body_err, xrdmin(fREQWaitRespData->resphdr.dlen, (kXR_int32)(sizeof(LastServerError)-1) ));
LastServerError.errnum = fErr;
}
break;
}
case kXR_redirect: {
// Hybrid case. A sync redirect request which comes out the async way.
// We handle it by simulating an async one
// Get the encapsulated data
struct ServerResponseBody_Redirect *rd;
rd = (struct ServerResponseBody_Redirect *)fREQWaitRespData->respdata;
// Explicit redirection request
if (rd && (strlen(rd->host) > 0)) {
Info(XrdClientDebug::kUSERDEBUG,
"ProcessAsynResp", "Requested sync redir (via async response) to " << rd->host <<
":" << ntohl(rd->port));
SetRequestedDestHost(rd->host, ntohl(rd->port));
// And then we disconnect only this logical conn
// The subsequent retry will bounce to the requested host
Disconnect(FALSE);
}
// We also have to fake a regular answer. kxr_wait is ok to make the thing retry!
fREQWaitRespData = (ServerResponseBody_Attn_asynresp *)malloc( sizeof(struct ServerResponseBody_Attn_asynresp) );
memset( fREQWaitRespData, 0, sizeof(struct ServerResponseBody_Attn_asynresp) );
fREQWaitRespData->resphdr.status = kXR_wait;
fREQWaitRespData->resphdr.dlen = sizeof(kXR_int32);
kXR_int32 i = htonl(1);
memcpy(&fREQWaitRespData->respdata, &i, sizeof(i));
free(unsolmsg->DonateData());
break;
}
}
unsolmsg->DonateData(); // The data blk is released from the orig message
// Signal the waiting condvar. Waiting is no more needed
// Note: the message's data will be freed by the waiting process!
fREQWaitResp->Signal();
fREQWaitResp->UnLock();
// The message is to be destroyed, its data has been saved
return kUNSOL_DISPOSE;
}
//_____________________________________________________________________________
int XrdClientConn::GetParallelStreamToUse(int reqsperstream) {
// Gets a parallel stream id to use to set the return path for a req
XrdClientLogConnection *lgc = 0;
XrdClientPhyConnection *phyc = 0;
lgc = ConnectionManager->GetConnection(fLogConnID);
if (!lgc) {
Error("GetParallelStreamToUse",
"Unknown logical conn " << fLogConnID);
return kWRITE;
}
phyc = lgc->GetPhyConnection();
if (!phyc) {
Error("GetParallelStreamToUse",
"Cannot find physical conn for logid " << fLogConnID);
return kWRITE;
}
return phyc->GetSockIdHint(reqsperstream);
}
//_____________________________________________________________________________
bool XrdClientConn::IsPhyConnConnected() {
// Tells if this instance seems correctly connected to a server
XrdClientLogConnection *lgc = 0;
XrdClientPhyConnection *phyc = 0;
lgc = ConnectionManager->GetConnection(fLogConnID);
if (!lgc) return false;
phyc = lgc->GetPhyConnection();
if (!phyc) return false;
return phyc->IsValid();
}
//_____________________________________________________________________________
int XrdClientConn:: GetParallelStreamCount() {
XrdClientLogConnection *lgc = 0;
XrdClientPhyConnection *phyc = 0;
lgc = ConnectionManager->GetConnection(fLogConnID);
if (!lgc) {
Error("GetParallelStreamCount",
"Unknown logical conn " << fLogConnID);
return 0;
}
phyc = lgc->GetPhyConnection();
if (!phyc) {
Error("GetParallelStreamCount",
"Cannot find physical conn for logid " << fLogConnID);
return 0;
}
return phyc->GetSockIdCount();
}
//_____________________________________________________________________________
XrdClientPhyConnection *XrdClientConn::GetPhyConn(int LogConnID) {
// Protected way to get the underlying physical connection
XrdClientLogConnection *log;
log = ConnectionManager->GetConnection(LogConnID);
if (log) return log->GetPhyConnection();
return 0;
}
bool XrdClientConn::DoWriteSoftCheckPoint() {
// Cycle trough the outstanding write requests,
// If some of them are expired, cancel them
// and retry in the sync way, one by one
// If some of them got a negative response of some kind... the same
// Exit at the first sync error
// Get the failed write reqs, and put them in a safe place.
// This call has to be done also just before disconnecting a logical conn,
// in order to collect all the outstanding write requests before they are forgotten
ConnectionManager->SidManager()->GetFailedOutstandingWriteRequests(fPrimaryStreamid, fWriteReqsToRetry);
for (int it = 0; it < fWriteReqsToRetry.GetSize(); it++) {
ClientRequest req;
req = fWriteReqsToRetry[it];
// Get the mem blk to write, directly from the cache, where it should be
// a unique blk. If it's not there, then this is an internal error.
void *data = fMainReadCache->FindBlk(req.write.offset, req.write.offset+req.write.dlen-1);
// Now we have the req and the data, we let the things go almost normally
// No big troubles, this func is called by the main requesting thread
if (data) {
// The recoveries go always through the main stream
req.write.pathid = 0;
bool ok = SendGenCommand(&req, data, 0, 0,
false, (char *)"Write_checkpoint");
UnPinCacheBlk(req.write.offset, req.write.offset+req.write.dlen-1);
// A total sync failure means that there is no more hope and that the destination file is
// surely incomplete.
if (!ok) return false;
}
else {
Error("DoWriteSoftCheckPoint", "Checkpoint data disappeared.");
return false;
}
}
// If we are here, all the requests were successful
fWriteReqsToRetry.Clear();
return true;
}
bool XrdClientConn::DoWriteHardCheckPoint() {
// Do almost the same as the soft checkpoint,
// But don't exit if there are still pending write reqs
// This has to guarantee that either a fatal error or a full success has happened
// Acts like a client-and-network-side flush
while(1) {
if (ConnectionManager->SidManager()->GetOutstandingWriteRequestCnt(fPrimaryStreamid) == 0) return true;
if (!DoWriteSoftCheckPoint()) return false;
if (ConnectionManager->SidManager()->GetOutstandingWriteRequestCnt(fPrimaryStreamid) == 0) return true;
//ConnectionManager->SidManager()->PrintoutOutstandingRequests();
fWriteWaitAck->Wait(1);
}
}
//_____________________________________________________________________________
void XrdClientConn::SetOpTimeLimit(int delta_secs) {
fOpTimeLimit = time(0)+delta_secs;
}
bool XrdClientConn::IsOpTimeLimitElapsed(time_t timenow) {
return (timenow > fOpTimeLimit);
}