#ifndef _XrdClientPhyConnection
#define _XrdClientPhyConnection
/******************************************************************************/
/* */
/* X r d C l i e n t P h y C o n n e c t i o n . h h */
/* */
/* Author: Fabrizio Furano (INFN Padova, 2004) */
/* Adapted from TXNetFile (root.cern.ch) originally done by */
/* Alvise Dorigo, Fabrizio Furano */
/* INFN Padova, 2003 */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// Class handling physical connections to xrootd servers //
// //
//////////////////////////////////////////////////////////////////////////
#include "XrdClient/XrdClientSock.hh"
#include "XrdClient/XrdClientMessage.hh"
#include "XrdClient/XrdClientUnsolMsg.hh"
#include "XrdClient/XrdClientInputBuffer.hh"
#include "XrdClient/XrdClientUrlInfo.hh"
#include "XrdSys/XrdSysPthread.hh"
#include "XrdSys/XrdSysSemWait.hh"
#include // for time_t data type
enum ELoginState {
kNo = 0,
kYes = 1,
kPending = 2
};
enum ERemoteServerType {
kSTError = -1, // Some error occurred: server type undetermined
kSTNone = 0, // Remote server type un-recognized
kSTRootd = 1, // Remote server type: old rootd server
kSTBaseXrootd = 2, // Remote server type: xrootd dynamic load balancer
kSTDataXrootd = 3, // Remote server type: xrootd data server
kSTMetaXrootd = 4 // Remote server type: xrootd meta manager
};
class XrdClientSid;
class XrdClientThread;
class XrdSecProtocol;
class XrdClientPhyConnection: public XrdClientUnsolMsgSender {
private:
time_t fLastUseTimestamp;
enum ELoginState fLogged; // only 1 login/auth is needed for physical
XrdSecProtocol *fSecProtocol; // authentication protocol
XrdClientInputBuffer
fMsgQ; // The queue used to hold incoming messages
int fRequestTimeout;
bool fMStreamsGoing;
XrdSysRecMutex fRwMutex; // Lock before using the physical channel
// (for reading and/or writing)
XrdSysRecMutex fMutex;
XrdSysRecMutex fMultireadMutex; // Used to arbitrate between multiple
// threads reading msgs from the same conn
XrdClientThread *fReaderthreadhandler[64]; // The thread which is going to pump
// out the data from the socket
int fReaderthreadrunning;
XrdClientUrlInfo fServer;
XrdClientSock *fSocket;
UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m);
XrdSysSemWait fReaderCV;
short fLogConnCnt; // Number of logical connections using this phyconn
XrdClientSid *fSidManager;
public:
long fServerProto; // The server protocol
ERemoteServerType fServerType;
long fTTLsec;
XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid);
~XrdClientPhyConnection();
XrdClientMessage *BuildMessage(bool IgnoreTimeouts, bool Enqueue);
bool CheckAutoTerm();
bool Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0);
//--------------------------------------------------------------------------
//! Connect to a remote location
//!
//! @param RemoteHost address descriptor
//! @param isUnix true if the address points to a Unix socket
//! @param fd a descriptor pointing to a connected socket
//! if the subroutine is supposed to reuse an existing
//! connection, -1 otherwise
//--------------------------------------------------------------------------
bool Connect( XrdClientUrlInfo RemoteHost, bool isUnix , int fd );
void CountLogConn(int d = 1);
void Disconnect();
ERemoteServerType
DoHandShake(ServerInitHandShake &xbody,
int substreamid = 0);
bool ExpiredTTL();
short GetLogConnCnt() const { return fLogConnCnt; }
int GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; }
long GetTTL() { return fTTLsec; }
XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; }
int GetSocket() { return fSocket ? fSocket->fSocket : -1; }
// Tells to the sock to rebuild the list of interesting selectors
void ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); }
int SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; }
void SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); }
void SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; }
void StartedReader();
bool IsAddress(const XrdOucString &addr) {
return ( (fServer.Host == addr) ||
(fServer.HostAddr == addr) );
}
ELoginState IsLogged();
bool IsPort(int port) { return (fServer.Port == port); };
bool IsUser(const XrdOucString &usr) { return (fServer.User == usr); };
bool IsValid();
void LockChannel();
// see XrdClientSock for the meaning of the parameters
int ReadRaw(void *buffer, int BufferLength, int substreamid = -1,
int *usedsubstreamid = 0);
XrdClientMessage *ReadMessage(int streamid);
bool ReConnect(XrdClientUrlInfo RemoteHost);
void SetLogged(ELoginState status) { fLogged = status; }
inline void SetTTL(long ttl) { fTTLsec = ttl; }
void StartReader();
void Touch();
void UnlockChannel();
int WriteRaw(const void *buffer, int BufferLength, int substreamid = 0);
int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); }
int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); }
void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); }
// Tells if the attempt to establish the parallel streams is ongoing or was done
// and mark it as ongoing or done
bool TestAndSetMStreamsGoing();
int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); }
int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); }
void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); }
void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); }
// To prohibit/re-enable a socket descriptor from being looked at by the reader threads
virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); }
virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); }
void ReadLock() { fMultireadMutex.Lock(); }
void ReadUnLock() { fMultireadMutex.UnLock(); }
int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); }
};
//
// Class implementing a trick to automatically unlock an XrdClientPhyConnection
//
class XrdClientPhyConnLocker {
private:
XrdClientPhyConnection *phyconn;
public:
XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) {
// Constructor
phyconn = phyc;
phyconn->LockChannel();
}
~XrdClientPhyConnLocker(){
// Destructor.
phyconn->UnlockChannel();
}
};
#endif