#ifndef XRD_CONN_H
#define XRD_CONN_H
/******************************************************************************/
/* */
/* X r d C l i e n t C o n n . h h */
/* */
/* Author: Fabrizio Furano (INFN Padova, 2004) */
/* Adapted from TXNetFile (root.cern.ch) originally done by */
/* Alvise Dorigo, Fabrizio Furano */
/* INFN Padova, 2003 */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
//////////////////////////////////////////////////////////////////////////
// //
// High level handler of connections to xrootd. //
// //
//////////////////////////////////////////////////////////////////////////
#include "XrdClient/XrdClientConst.hh"
#include "time.h"
#include "XrdClient/XrdClientConnMgr.hh"
#include "XrdClient/XrdClientMessage.hh"
#include "XrdClient/XrdClientUrlInfo.hh"
#include "XrdClient/XrdClientReadCache.hh"
#include "XrdOuc/XrdOucHash.hh"
#include "XrdSys/XrdSysPthread.hh"
#define ConnectionManager XrdClientConn::GetConnectionMgr()
class XrdClientAbs;
class XrdSecProtocol;
class XrdClientConn {
public:
enum ESrvErrorHandlerRetval {
kSEHRReturnMsgToCaller = 0,
kSEHRBreakLoop = 1,
kSEHRContinue = 2,
kSEHRReturnNoMsgToCaller = 3,
kSEHRRedirLimitReached = 4
};
enum EThreeStateReadHandler {
kTSRHReturnMex = 0,
kTSRHReturnNullMex = 1,
kTSRHContinue = 2
};
// To keep info about an open session
struct SessionIDInfo {
char id[16];
};
int fLastDataBytesRecv;
int fLastDataBytesSent;
XErrorCode fOpenError;
XrdOucString fRedirOpaque; // Opaque info returned by the server when
// redirecting. To be used in the next opens
XrdClientConn();
virtual ~XrdClientConn();
inline bool CacheWillFit(long long bytes) {
if (!fMainReadCache)
return FALSE;
return fMainReadCache->WillFit(bytes);
}
bool CheckHostDomain(XrdOucString hostToCheck);
short Connect(XrdClientUrlInfo Host2Conn,
XrdClientAbsUnsolMsgHandler *unsolhandler);
void Disconnect(bool ForcePhysicalDisc);
virtual bool GetAccessToSrv();
XReqErrorType GoBackToRedirector();
XrdOucString GetClientHostDomain() { return fgClientHostDomain; }
static XrdClientPhyConnection *GetPhyConn(int LogConnID);
// --------- Cache related stuff
long GetDataFromCache(const void *buffer,
long long begin_offs,
long long end_offs,
bool PerfCalc,
XrdClientIntvList &missingblks,
long &outstandingblks );
bool SubmitDataToCache(XrdClientMessage *xmsg,
long long begin_offs,
long long end_offs);
bool SubmitRawDataToCache(const void *buffer,
long long begin_offs,
long long end_offs);
void SubmitPlaceholderToCache(long long begin_offs,
long long end_offs) {
if (fMainReadCache)
fMainReadCache->PutPlaceholder(begin_offs, end_offs);
}
void RemoveAllDataFromCache(bool keepwriteblocks=true) {
if (fMainReadCache)
fMainReadCache->RemoveItems(keepwriteblocks);
}
void RemoveDataFromCache(long long begin_offs,
long long end_offs, bool remove_overlapped = false) {
if (fMainReadCache)
fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped);
}
void RemovePlaceholdersFromCache() {
if (fMainReadCache)
fMainReadCache->RemovePlaceholders();
}
void PrintCache() {
if (fMainReadCache)
fMainReadCache->PrintCache();
}
bool GetCacheInfo(
// The actual cache size
int &size,
// The number of bytes submitted since the beginning
long long &bytessubmitted,
// The number of bytes found in the cache (estimate)
long long &byteshit,
// The number of reads which did not find their data
// (estimate)
long long &misscount,
// miss/totalreads ratio (estimate)
float &missrate,
// number of read requests towards the cache
long long &readreqcnt,
// ratio between bytes found / bytes submitted
float &bytesusefulness
) {
if (!fMainReadCache) return false;
fMainReadCache->GetInfo(size,
bytessubmitted,
byteshit,
misscount,
missrate,
readreqcnt,
bytesusefulness);
return true;
}
void SetCacheSize(int CacheSize) {
if (!fMainReadCache && CacheSize)
fMainReadCache = new XrdClientReadCache();
if (fMainReadCache)
fMainReadCache->SetSize(CacheSize);
}
void SetCacheRmPolicy(int RmPolicy) {
if (fMainReadCache)
fMainReadCache->SetBlkRemovalPolicy(RmPolicy);
}
void UnPinCacheBlk(long long begin_offs, long long end_offs) {
fMainReadCache->UnPinCacheBlk(begin_offs, end_offs);
// Also use this to signal the possibility to proceed for a hard checkpoint
fWriteWaitAck->Broadcast();
}
// -------------------
int GetLogConnID() const { return fLogConnID; }
ERemoteServerType GetServerType() const { return fServerType; }
kXR_unt16 GetStreamID() const { return fPrimaryStreamid; }
inline XrdClientUrlInfo *GetLBSUrl() { return fLBSUrl; }
inline XrdClientUrlInfo *GetMetaUrl() { return fMetaUrl; }
inline XrdClientUrlInfo GetCurrentUrl() { return fUrl; }
inline XrdClientUrlInfo GetRedirUrl() { return fREQUrl; }
XErrorCode GetOpenError() const { return fOpenError; }
virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest);
virtual XReqErrorType GoToMetaManager();
bool IsConnected() const { return fConnected; }
bool IsPhyConnConnected();
struct ServerResponseHeader
LastServerResp;
struct ServerResponseBody_Error
LastServerError;
void ClearLastServerError() {
memset(&LastServerError, 0, sizeof(LastServerError));
LastServerError.errnum = kXR_noErrorYet;
}
UnsolRespProcResult ProcessAsynResp(XrdClientMessage *unsolmsg);
virtual bool SendGenCommand(ClientRequest *req,
const void *reqMoreData,
void **answMoreDataAllocated,
void *answMoreData, bool HasToAlloc,
char *CmdName, int substreamid = 0);
int GetOpenSockFD() const { return fOpenSockFD; }
void SetClientHostDomain(const char *src) { fgClientHostDomain = src; }
void SetConnected(bool conn) { fConnected = conn; }
void SetOpenError(XErrorCode err) { fOpenError = err; }
// Gets a parallel stream id to use to set the return path for a re
int GetParallelStreamToUse(int reqsperstream);
int GetParallelStreamCount(); // Returns the total number of connected streams
void SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; }
void SetRequestedDestHost(char *newh, kXR_int32 port) {
fREQUrl = fUrl;
fREQUrl.Host = newh;
fREQUrl.Port = port;
fREQUrl.SetAddrFromHost();
}
// Puts this instance in pause state for wsec seconds.
// A value <= 0 revokes immediately the pause state
void SetREQPauseState(kXR_int32 wsec) {
// Lock mutex
fREQWait->Lock();
if (wsec > 0)
fREQWaitTimeLimit = time(0) + wsec;
else {
fREQWaitTimeLimit = 0;
fREQWait->Broadcast();
}
// UnLock mutex
fREQWait->UnLock();
}
// Puts this instance in connect-pause state for wsec seconds.
// Any future connection attempt will not happen before wsec
// and the first one will be towards the given host
void SetREQDelayedConnectState(kXR_int32 wsec) {
// Lock mutex
fREQConnectWait->Lock();
if (wsec > 0)
fREQConnectWaitTimeLimit = time(0) + wsec;
else {
fREQConnectWaitTimeLimit = 0;
fREQConnectWait->Broadcast();
}
// UnLock mutex
fREQConnectWait->UnLock();
}
void SetSID(kXR_char *sid);
inline void SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; }
// Sends the request to the server, through logconn with ID LogConnID
// The request is sent with a streamid 'child' of the current one, then marked as pending
// Its answer will be caught asynchronously
XReqErrorType WriteToServer_Async(ClientRequest *req,
const void* reqMoreData,
int substreamid = 0);
static XrdClientConnectionMgr *GetConnectionMgr()
{ return fgConnectionMgr;} //Instance of the conn manager
static void DelSessionIDRepo() {fSessionIDRMutex.Lock();
fSessionIDRepo.Purge();
fSessionIDRMutex.UnLock();
}
void GetSessionID(SessionIDInfo &sess) {sess = mySessionID;}
long GetServerProtocol() { return fServerProto; }
short GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; }
void SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; }
short GetRedirCnt() const { return fGlobalRedirCnt; }
bool DoWriteSoftCheckPoint();
bool DoWriteHardCheckPoint();
void UnPinCacheBlk();
// To give a max number of seconds for an operation to complete, no matter what happens inside
// e.g. redirections, sleeps, failed connection attempts etc.
void SetOpTimeLimit(int delta_secs);
bool IsOpTimeLimitElapsed(time_t timenow);
protected:
void SetLogConnID(int cid) { fLogConnID = cid; }
void SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; }
// The handler which first tried to connect somewhere
XrdClientAbsUnsolMsgHandler *fUnsolMsgHandler;
XrdClientUrlInfo fUrl; // The current URL
XrdClientUrlInfo *fLBSUrl; // Needed to save the load balancer url
XrdClientUrlInfo fREQUrl; // For explicitly requested redirs
short fGlobalRedirCnt; // Number of redirections
private:
static XrdOucString fgClientHostDomain; // Save the client's domain name
bool fConnected;
bool fGettingAccessToSrv; // To avoid recursion in desperate situations
time_t fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection
int fLogConnID; // Logical connection ID used
kXR_unt16 fPrimaryStreamid; // Streamid used for normal communication
// NB it's a copy of the one contained in
// the logconn
short fMaxGlobalRedirCnt;
XrdClientReadCache *fMainReadCache;
// The time limit for a transaction
time_t fOpTimeLimit;
XrdClientAbs *fRedirHandler; // Pointer to a class inheriting from
// XrdClientAbs providing methods
// to handle a redir at higher level
XrdOucString fRedirInternalToken; // Token returned by the server when
// redirecting. To be used in the next logins
XrdSysCondVar *fREQWaitResp; // For explicitly requested delayed async responses
ServerResponseBody_Attn_asynresp *
fREQWaitRespData; // For explicitly requested delayed async responses
time_t fREQWaitTimeLimit; // For explicitly requested pause state
XrdSysCondVar *fREQWait; // For explicitly requested pause state
time_t fREQConnectWaitTimeLimit; // For explicitly requested delayed reconnect
XrdSysCondVar *fREQConnectWait; // For explicitly requested delayed reconnect
long fServerProto; // The server protocol
ERemoteServerType fServerType; // Server type as returned by doHandShake()
SessionIDInfo mySessionID; // Login session ID
static XrdSysMutex fSessionIDRMutex; // Mutex for the Repo
static XrdOucHash
fSessionIDRepo; // The repository of session IDs, shared.
// Association between
// :. and a SessionIDInfo struct
int fOpenSockFD; // Descriptor of the underlying socket
static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager
XrdSysCondVar *fWriteWaitAck;
XrdClientVector fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection
bool CheckErrorStatus(XrdClientMessage *, short &, char *);
void CheckPort(int &port);
void CheckREQPauseState();
void CheckREQConnectWaitState();
bool CheckResp(struct ServerResponseHeader *resp, const char *method);
XrdClientMessage *ClientServerCmd(ClientRequest *req,
const void *reqMoreData,
void **answMoreDataAllocated,
void *answMoreData,
bool HasToAlloc,
int substreamid = 0);
XrdSecProtocol *DoAuthentication(char *plist, int plsiz);
ERemoteServerType DoHandShake(short log);
bool DoLogin();
bool DomainMatcher(XrdOucString dom, XrdOucString domlist);
XrdOucString GetDomainToMatch(XrdOucString hostname);
ESrvErrorHandlerRetval HandleServerError(XReqErrorType &, XrdClientMessage *,
ClientRequest *);
bool MatchStreamid(struct ServerResponseHeader *ServerResponse);
// Sends a close request, without waiting for an answer
// useful (?) to be sent just before closing a badly working stream
bool PanicClose();
XrdOucString ParseDomainFromHostname(XrdOucString hostname);
XrdClientMessage *ReadPartialAnswer(XReqErrorType &, size_t &,
ClientRequest *, bool, void**,
EThreeStateReadHandler &);
// void ClearSessionID();
XReqErrorType WriteToServer(ClientRequest *req,
const void* reqMoreData,
short LogConnID,
int substreamid = 0);
bool WaitResp(int secsmax);
XrdClientUrlInfo *fMetaUrl; // Meta manager url
bool fLBSIsMeta; // Is current redirector a meta manager?
public:
XrdOucString fRedirCGI; // Same as fRedirOpaque but persistent
};
#endif