/* */
/* X r d C l i e n t P h y C o n n e c t i o n . c c */
/* */
/* Author: Fabrizio Furano (INFN Padova, 2004) */
/* Adapted from TXNetFile (root.cern.ch) originally done by */
/* Alvise Dorigo, Fabrizio Furano */
/* INFN Padova, 2003 */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
// //
// Class handling physical connections to xrootd servers //
// //
#include "XrdClient/XrdClientPhyConnection.hh"
#include "XrdClient/XrdClientDebug.hh"
#include "XrdClient/XrdClientMessage.hh"
#include "XrdClient/XrdClientEnv.hh"
#include "XrdClient/XrdClientSid.hh"
#include "XrdClient/XrdClientPSock.hh"
#include "XrdClient/XrdClientThread.hh"
#include "XrdSec/XrdSecInterface.hh"
#ifndef WIN32
#define READERCOUNT (xrdmin(50, EnvGetLong(NAME_MULTISTREAMCNT)+1))
void *SocketReaderThread(void * arg, XrdClientThread *thr)
// This thread is the base for the async capabilities of XrdClientPhyConnection
// It repeatedly keeps reading from the socket, while feeding the
// MsqQ with a stream of XrdClientMessages containing what's happening
// at the socket level
// Mask all allowed signals
if (thr->MaskSignal(0) != 0)
Error("SocketReaderThread", "Warning: problems masking signals");
XrdClientPhyConnection *thisObj;
"Reader Thread starting.");
thisObj = (XrdClientPhyConnection *)arg;
while (1) {
thisObj->BuildMessage(TRUE, TRUE);
if (thisObj->CheckAutoTerm())
"Reader Thread exiting.");
return 0;
XrdClientPhyConnection::XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h,
XrdClientSid *sid):
fMStreamsGoing(false), fReaderCV(0), fLogConnCnt(0), fSidManager(sid),
fServerProto(0) {
// Constructor
fServerType = kSTNone;
// Immediate destruction of this object is always a bad idea
fTTLsec = 30;
fRequestTimeout = EnvGetLong(NAME_REQUESTTIMEOUT);
UnsolicitedMsgHandler = h;
for (int i = 0; i < READERCOUNT; i++)
fReaderthreadhandler[i] = 0;
fReaderthreadrunning = 0;
fSecProtocol = 0;
// Destructor
"Destroying. [" << fServer.Host << ":" << fServer.Port << "]");
for (int i = 0; i < READERCOUNT; i++)
delete fReaderthreadhandler[i];
if (fSocket) {
delete fSocket;
fSocket = 0;
if (fSecProtocol) {
// This insures that the right destructor is called
// (Do not do C++ delete).
fSecProtocol = 0;
bool XrdClientPhyConnection::Connect(XrdClientUrlInfo RemoteHost, bool isUnix)
return Connect( RemoteHost, isUnix, -1 );
bool XrdClientPhyConnection::Connect(XrdClientUrlInfo RemoteHost, bool isUnix, int fd)
// Connect to remote server
XrdSysMutexHelper l(fMutex);
if (isUnix) {
Info(XrdClientDebug::kHIDEBUG, "Connect", "Connecting to " << RemoteHost.File);
} else {
"Connect", "Connecting to [" << RemoteHost.Host << ":" << RemoteHost.Port << "]");
fSocket = new XrdClientPSock(RemoteHost);
fSocket = new XrdClientSock(RemoteHost, 0, fd );
if(!fSocket) {
Error("Connect","Unable to create a client socket. Aborting.");
if (!fSocket->IsConnected()) {
if (isUnix) {
Error("Connect", "can't open UNIX connection to " << RemoteHost.File);
} else {
Error("Connect", "can't open connection to [" <<
RemoteHost.Host << ":" << RemoteHost.Port << "]");
return FALSE;
if (isUnix) {
Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to " << RemoteHost.File);
} else {
Info(XrdClientDebug::kHIDEBUG, "Connect", "Connected to [" <<
RemoteHost.Host << ":" << RemoteHost.Port << "]");
fServer = RemoteHost;
XrdSysMutexHelper l(fMutex);
fReaderthreadrunning = 0;
return TRUE;
void XrdClientPhyConnection::StartReader() {
bool running;
XrdSysMutexHelper l(fMutex);
running = fReaderthreadrunning;
// Start reader thread
// Parametric asynchronous stuff.
// If we are going Sync, then nothing has to be done,
// otherwise the reader thread must be started
if ( !running ) {
"StartReader", "Starting reader thread...");
int rdcnt = READERCOUNT;
if (fServerType == kSTBaseXrootd) rdcnt = 1;
for (int i = 0; i < rdcnt; i++) {
// Now we launch the reader thread
fReaderthreadhandler[i] = new XrdClientThread(SocketReaderThread);
if (!fReaderthreadhandler[i]) {
"Can't create reader thread: out of system resources");
// HELP: what do we do here
if (fReaderthreadhandler[i]->Run(this)) {
"Can't run reader thread: out of system resources. Critical error.");
// HELP: what do we do here
// sleep until at least one thread starts running, which hopefully
// is not forever.
int maxRetries = 10;
while (--maxRetries >= 0) {
{ XrdSysMutexHelper l(fMutex);
if (fReaderthreadrunning)
void XrdClientPhyConnection::StartedReader() {
XrdSysMutexHelper l(fMutex);
bool XrdClientPhyConnection::ReConnect(XrdClientUrlInfo RemoteHost)
// Re-connection attempt
return Connect(RemoteHost);
void XrdClientPhyConnection::Disconnect()
XrdSysMutexHelper l(fMutex);
// Disconnect from remote server
if (fSocket) {
"PhyConnection", "Disconnecting socket...");
// We do not destroy the socket here. The socket will be destroyed
// in CheckAutoTerm or in the ConnMgr
bool XrdClientPhyConnection::CheckAutoTerm() {
XrdSysMutexHelper l(fMutex);
// Parametric asynchronous stuff
// If we are going async, we might be willing to term ourself
if ( !IsValid() ) {
"CheckAutoTerm", "Self-Cancelling reader thread.");
return true;
return false;
void XrdClientPhyConnection::Touch()
// Set last-use-time to present time
XrdSysMutexHelper l(fMutex);
time_t t = time(0);
// "Touch",
// "Setting last use to current time" << t);
fLastUseTimestamp = t;
int XrdClientPhyConnection::ReadRaw(void *buf, int len, int substreamid,
int *usedsubstreamid) {
// Receive 'len' bytes from the connected server and store them in 'buf'.
// Return 0 if OK.
// If substreamid = -1 then
// gets length bytes from any par socket, and returns the usedsubstreamid
// where it got the bytes from
// Otherwise read bytes from the specified substream. 0 is the main one.
int res;
if (IsValid()) {
"Reading from " <<
fServer.Host << ":" << fServer.Port);
res = fSocket->RecvRaw(buf, len, substreamid, usedsubstreamid);
if ((res < 0) && (res != TXSOCK_ERR_TIMEOUT) && errno ) {
//strerror_r(errno, errbuf, sizeof(buf));
"ReadRaw", "Read error on " <<
fServer.Host << ":" << fServer.Port << ". errno=" << errno );
// If a socket error comes, then we disconnect
// but we have not to disconnect in the case of a timeout
if (((res < 0) && (res == TXSOCK_ERR)) ||
(!fSocket->IsConnected())) {
"Disconnection reported on" <<
fServer.Host << ":" << fServer.Port);
// Let's dump the received bytes
if ((res > 0) && (DebugLevel() > XrdClientDebug::kDUMPDEBUG)) {
XrdOucString s = " ";
char b[256];
for (int i = 0; i < xrdmin(res, 256); i++) {
sprintf(b, "%.2x ", *((unsigned char *)buf + i));
s += b;
if (!((i + 1) % 16)) s += "\n ";
"ReadRaw", "Read " << res << "bytes. Dump:" << endl << s << endl);
return res;
else {
// Socket already destroyed or disconnected
"ReadRaw", "Socket is disconnected.");
return TXSOCK_ERR;
XrdClientMessage *XrdClientPhyConnection::ReadMessage(int streamid) {
// Gets a full loaded XrdClientMessage from this phyconn.
// May be a pure msg pick from a queue
return fMsgQ.GetMsg(streamid, fRequestTimeout );
XrdClientMessage *XrdClientPhyConnection::BuildMessage(bool IgnoreTimeouts, bool Enqueue)
// Builds an XrdClientMessage, and makes it read its header/data from the socket
// Also put automatically the msg into the queue
XrdClientMessage *m;
struct SidInfo *parallelsid = 0;
UnsolRespProcResult res = kUNSOL_KEEP;
m = new XrdClientMessage();
if (!m) {
"Cannot create a new Message. Aborting.");
// fMultireadMutex.Lock();
// fMultireadMutex.UnLock();
parallelsid = (fSidManager) ? fSidManager->GetSidInfo(m->HeaderSID()) : 0;
if ( parallelsid || (m->IsAttn()) || (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr)) {
// Here we insert the PhyConn-level support for unsolicited responses
// Some of them will be propagated in some way to the upper levels
// The path should be
// here -> XrdClientConnMgr -> all the involved XrdClientLogConnections ->
// -> all the corresponding XrdClient
if (m->GetStatusCode() == XrdClientMessage::kXrdMSC_readerr) {
"BuildMessage"," propagating a communication error message.");
else {
"BuildMessage"," propagating unsol id " << m->HeaderSID());
res = HandleUnsolicited(m);
if (Enqueue && !parallelsid && !m->IsAttn() && (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr)) {
// If we have to ignore the socket timeouts, then we have not to
// feed the queue with them. In this case, the newly created XrdClientMessage
// has to be freed.
//if ( !IgnoreTimeouts || !m->IsError() )
//bool waserror;
if (IgnoreTimeouts) {
if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
//waserror = m->IsError();
"BuildMessage"," posting id "<HeaderSID());
//if (waserror)
// for (int kk=0; kk < 10; kk++) fMsgQ.PutMsg(0);
else {
"BuildMessage"," deleting id "<HeaderSID());
delete m;
m = 0;
} else
else {
// The purpose of this message ends here
if ( (parallelsid) && (res != kUNSOL_KEEP) &&
(m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr) )
if (fSidManager && (m->HeaderStatus() != kXR_oksofar))
// if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_readerr) {
delete m;
m = 0;
// }
return m;
UnsolRespProcResult XrdClientPhyConnection::HandleUnsolicited(XrdClientMessage *m)
// Local processing of unsolicited responses is done here
bool ProcessingToGo = TRUE;
struct ServerResponseBody_Attn *attnbody;
// Local pre-processing of the unsolicited XrdClientMessage
attnbody = (struct ServerResponseBody_Attn *)m->GetData();
if (attnbody && (m->IsAttn())) {
attnbody->actnum = ntohl(attnbody->actnum);
switch (attnbody->actnum) {
case kXR_asyncms:
// A message arrived from the server. Let's print it.
"Message from " <<
fServer.Host << ":" << fServer.Port << ". '" <<
attnbody->parms << "'");
ProcessingToGo = FALSE;
case kXR_asyncab:
// The server requested to abort the execution!!!!
"******* Abort request received ******* Server: " <<
fServer.Host << ":" << fServer.Port << ". Msg: '" <<
attnbody->parms << "'");
ProcessingToGo = FALSE;
// Now we propagate the message to the interested object, if any
// It could be some sort of upper layer of the architecture
if (ProcessingToGo) {
UnsolRespProcResult retval;
retval = SendUnsolicitedMsg(this, m);
// Request post-processing
if (attnbody && (m->IsAttn())) {
switch (attnbody->actnum) {
case kXR_asyncrd:
// After having set all the belonging object, we disconnect.
// The next commands will redirect-on-error where we want
case kXR_asyncdi:
// After having set all the belonging object, we disconnect.
// The next connection attempt will behave as requested,
// i.e. waiting some time before reconnecting
} // switch
return retval;
int XrdClientPhyConnection::WriteRaw(const void *buf, int len, int substreamid) {
// Send 'len' bytes located at 'buf' to the connected server.
// Return number of bytes sent.
// usesubstreams tells if we have to select a substream to send the data through or
// the main stream is to be used
// substreamid == 0 means to use the main stream
int res;
if (IsValid()) {
"Writing to substreamid " <<
res = fSocket->SendRaw(buf, len, substreamid);
if ((res < 0) && (res != TXSOCK_ERR_TIMEOUT) && errno) {
//strerror_r(errno, errbuf, sizeof(buf));
"WriteRaw", "Write error on " <<
fServer.Host << ":" << fServer.Port << ". errno=" << errno );
// If a socket error comes, then we disconnect (and destroy the fSocket)
if ((res < 0) || (!fSocket) || (!fSocket->IsConnected())) {
"Disconnection reported on" <<
fServer.Host << ":" << fServer.Port);
return( res );
else {
// Socket already destroyed or disconnected
"Socket is disconnected.");
return TXSOCK_ERR;
bool XrdClientPhyConnection::ExpiredTTL()
// Check expiration time
return( (time(0) - fLastUseTimestamp) > fTTLsec );
void XrdClientPhyConnection::LockChannel()
// Lock
void XrdClientPhyConnection::UnlockChannel()
// Unlock
ERemoteServerType XrdClientPhyConnection::DoHandShake(ServerInitHandShake &xbody,
int substreamid)
// Performs initial hand-shake with the server in order to understand which
// kind of server is there at the other side and to make the server know who
// we are. Note that if the substreamid is negative, this is a handshake for
// a parallel stream and we can do a short handshake.
struct ClientInitHandShake initHS;
ServerResponseType type;
ERemoteServerType typeres = kSTNone;
int isPS = (substreamid < 0);
int writeres, readres, len = 0;
// Set field in network byte order
memset(&initHS, 0, sizeof(initHS));
initHS.fourth = (kXR_int32)htonl(4);
initHS.fifth = (kXR_int32)htonl(2012);
// Create protocol request
ClientRequest req;
memset( &req, 0, sizeof( req ) );
req.header.requestid = kXR_protocol;
req.protocol.clientpv = kXR_PROTOCOLVERSION;
// Send the handshake and the kXR_protocol request
if( DebugLevel() >= XrdClientDebug::kDUMPDEBUG )
smartPrintClientHeader( &req );
// For parallel streams we only send the handshake. For normal streams we
// piggy-back a protocol request (i.e., extended handshake). This is for
// historical reasons to keep backward compatability.
if (isPS)
{Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"HandShake step 1: Sending handshake for a parallel stream" );
writeres = WriteRaw( &initHS, sizeof(initHS), substreamid );
} else {
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"HandShake step 1: Sending handshake with a piggy-backed protocol request" );
clientMarshall( &req );
len = sizeof( req ) + sizeof( initHS );
char buffer[sizeof(req)+sizeof(initHS)];
memcpy( buffer, &initHS, sizeof( initHS ) );
memcpy( buffer+sizeof( initHS ), &req, sizeof( req ) );
writeres = WriteRaw( buffer, len, substreamid );
if( writeres < 0 )
Info( XrdClientDebug::kNODEBUG,"DoHandShake", "Failed to send " << len <<
" bytes of protocol info request. Retrying ...");
return kSTError;
// Read from server the first 4 bytes
len = sizeof(type);
"HandShake step 2: Reading " << len <<
" bytes.");
// Read returns the return value of TSocket->RecvRaw... that returns the
// return value of recv (unix low level syscall)
readres = ReadRaw(&type,
len, substreamid); // Reads 4(2+2) bytes
if (readres < 0) {
Info(XrdClientDebug::kNODEBUG, "DoHandShake", "Failed to read " << len <<
" bytes. Retrying ...");
return kSTError;
// to host byte order
type = ntohl(type);
// Check if the server is the eXtended rootd or not, checking the value
// of type
if (type == 0) { // ok, eXtended!
len = sizeof(xbody);
"HandShake step 3: Reading " << len <<
" bytes.");
readres = ReadRaw(&xbody, len, substreamid); // Read 12(4+4+4) bytes
if (readres < 0) {
Error("DoHandShake", "Error reading " << len <<
" bytes.");
return kSTError;
"Server protocol: " << xbody.protover << " type: " << xbody.msgval);
// For parallel streams we never sent a protocol request. Otherwise, we
// need to get the protocol request response. Ideally, we would continue
// doing this using the unlocked ReadRaw() as the handshake is technically
// atomic. The added code went through the message queue making it not
// atomic which made it impossible to setup a parallel stream. the ideal
// fix would cause too much code to change, so we do a quick and dirty.
if (isPS)
{typeres = kSTDataXrootd;
if (xbody.msgval & kXR_DataServer) fServerType = kSTDataXrootd;
else fServerType = kSTBaseXrootd;
fServerProto = xbody.protover;
return fServerType;
// Get the response to the protocol message
XrdClientMessage *msg = new XrdClientMessage();
msg->ReadRaw( this );
if( DebugLevel() >= XrdClientDebug::kDUMPDEBUG )
smartPrintServerHeader( &msg->fHdr );
// Got correct response from the server
if( !msg->IsError() && msg->HeaderStatus() == kXR_ok )
ServerResponseBody_Protocol *resp = (ServerResponseBody_Protocol*)msg->GetData();
resp->pval = ntohl( resp->pval );
resp->flags = ntohl( resp->flags );
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"Server protocol (kXR_protocol): " << resp->pval << " flags: " << resp->flags );
// Get the server type
if( resp->pval >= 0x297 )
if( resp->flags & kXR_isManager )
if( resp->flags & kXR_attrMeta ) typeres = kSTMetaXrootd;
else typeres = kSTBaseXrootd;
else if( resp->flags & kXR_isServer )
typeres = kSTDataXrootd;
fServerType = typeres;
fServerProto = resp->pval;
delete msg;
return typeres;
// Protocol not supported
Info( XrdClientDebug::kHIDEBUG, "DoHandShake",
"No valid response to the protocol request" );
delete msg;
msg = 0;
// check if the eXtended rootd is a data server
switch (xbody.msgval) {
case kXR_DataServer:
// This is a data server
typeres = kSTDataXrootd;
case kXR_LBalServer:
typeres = kSTBaseXrootd;
} else {
// We are here if it wasn't an XRootd
// and we need to complete the reading
if (type == 8)
typeres = kSTRootd;
// We dunno the server type
typeres = kSTNone;
fServerType = typeres;
fServerProto = xbody.protover;
return typeres;
void XrdClientPhyConnection::CountLogConn(int d)
// Modify countre of logical connections using this phyconn
fLogConnCnt += d;
bool XrdClientPhyConnection::TestAndSetMStreamsGoing() {
XrdSysMutexHelper mtx(fMutex);
bool retval = fMStreamsGoing;
fMStreamsGoing = true;
return retval;
bool XrdClientPhyConnection::IsValid() {
XrdSysMutexHelper l(fMutex);
return ( (fSocket != 0) && fSocket->IsConnected());
ELoginState XrdClientPhyConnection::IsLogged() {
const XrdSysMutexHelper l(fMutex);
return fLogged;