/******************************************************************************/
/* */
/* X r d X r o o t d T r a n s i t . c c */
/* */
/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include "XProtocol/XProtocol.hh"
#include "XrdSec/XrdSecEntity.hh"
#include "Xrd/XrdBuffer.hh"
#include "Xrd/XrdLink.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "XrdSys/XrdSysAtomics.hh"
#include "XrdXrootd/XrdXrootdStats.hh"
#include "XrdXrootd/XrdXrootdTrace.hh"
#include "XrdXrootd/XrdXrootdTransit.hh"
#include "XrdXrootd/XrdXrootdTransPend.hh"
#include "XrdXrootd/XrdXrootdTransSend.hh"
/******************************************************************************/
/* C l o b a l S y m b o l s */
/******************************************************************************/
extern XrdOucTrace *XrdXrootdTrace;
#undef TRACELINK
#define TRACELINK Link
#define XRD_GETNUM(x)\
ntohl(*(static_cast(static_cast(x))))
/******************************************************************************/
/* S t a t i c M e m b e r s */
/******************************************************************************/
const char *XrdXrootdTransit::reqTab = XrdXrootdTransit::ReqTable();
XrdObjectQ
XrdXrootdTransit::TranStack("TranStack",
"transit protocol anchor");
/******************************************************************************/
/* A l l o c */
/******************************************************************************/
XrdXrootdTransit *XrdXrootdTransit::Alloc(XrdXrootd::Bridge::Result *rsltP,
XrdLink *linkP,
XrdSecEntity *seceP,
const char *nameP,
const char *protP
)
{
XrdXrootdTransit *xp;
// Simply return a new transit object masquerading as a bridge
//
if (!(xp = TranStack.Pop())) xp = new XrdXrootdTransit();
xp->Init(rsltP, linkP, seceP, nameP, protP);
return xp;
}
/******************************************************************************/
/* A t t n */
/******************************************************************************/
int XrdXrootdTransit::Attn(XrdLink *lP, short *theSID, int rcode,
const struct iovec *ioV, int ioN, int ioL)
{
XrdXrootdTransPend *tP;
// Find the request
//
if (!(tP = XrdXrootdTransPend::Remove(lP, *theSID)))
{TRACE(REQ, "Unable to find request for " <ID <<" sid=" <<*theSID);
return 0;
}
// Resume the request as we have been waiting for the response.
//
return tP->bridge->AttnCont(tP, rcode, ioV, ioN, ioL);
}
/******************************************************************************/
/* A t t n C o n t */
/******************************************************************************/
int XrdXrootdTransit::AttnCont(XrdXrootdTransPend *tP, int rcode,
const struct iovec *ioV, int ioN, int ioL)
{
int rc;
// Refresh the request structure
//
memcpy(&Request, &(tP->Pend.Request), sizeof(Request));
delete tP;
runWait = 0;
// Reissue the request if it's a wait 0 response.
//
if (rcode==kXR_wait
&& (!ioN || XRD_GETNUM(ioV[0].iov_base) == 0))
{Sched->Schedule((XrdJob *)&waitJob);
return 0;
}
// Send off the defered response
//
rc = Send(rcode, ioV, ioN, ioL);
// Handle end based on current state
//
if (rc >= 0 && !runWait)
{if (runDone)
{AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
}
if (reInvoke) Sched->Schedule((XrdJob *)&respJob);
else Link->Enable();
}
// All done
//
return rc;
}
/******************************************************************************/
/* D i s c */
/******************************************************************************/
bool XrdXrootdTransit::Disc()
{
char buff[128];
int rc;
// We do not allow disconnection while we are active
//
AtomicBeg(runMutex);
rc = AtomicInc(runStatus);
AtomicEnd(runMutex);
if (rc) return false;
// Reconnect original protocol to the link
//
Link->setProtocol(realProt);
// Now we need to recycle our xrootd part
//
sprintf(buff, "%s disconnection", pName);
XrdXrootdProtocol::Recycle(Link, time(0)-cTime, buff);
// Now just free up our object.
//
TranStack.Push(&TranLink);
return true;
}
/******************************************************************************/
/* Private: F a i l */
/******************************************************************************/
bool XrdXrootdTransit::Fail(int ecode, const char *etext)
{
runError = ecode;
runEText = etext;
return true;
}
/******************************************************************************/
/* F a t a l */
/******************************************************************************/
int XrdXrootdTransit::Fatal(int rc)
{
XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
Request.header.requestid);
return (respObj->Error(rInfo, runError, runEText) ? rc : -1);
}
/******************************************************************************/
/* I n i t */
/******************************************************************************/
void XrdXrootdTransit::Init(XrdScheduler *schedP, int qMax, int qTTL)
{
TranStack.Set(schedP, XrdXrootdTrace, TRACE_MEM);
TranStack.Set(qMax, qTTL);
}
/******************************************************************************/
void XrdXrootdTransit::Init(XrdXrootd::Bridge::Result *respP, // Private
XrdLink *linkP,
XrdSecEntity *seceP,
const char *nameP,
const char *protP
)
{
static XrdSysMutex myMutex;
static int bID = 0;
XrdNetAddrInfo *addrP;
const char *who;
char uname[sizeof(Request.login.username)+1];
int pID, n;
// Set standard stuff
//
runArgs = 0;
runALen = 0;
runABsz = 0;
runError = 0;
runStatus = 0;
runWait = 0;
runWTot = 0;
runWMax = 3600;
runWCall = false;
runDone = false;
reInvoke = false;
wBuff = 0;
wBLen = 0;
respObj = respP;
pName = protP;
// Bind the protocol to the link
//
SI->Bump(SI->Count);
Link = linkP;
Response.Set(linkP);
Response.Set(this);
strcpy(Entity.prot, "host");
Entity.host = (char *)linkP->Host();
// Develop a trace identifier
//
myMutex.Lock(); pID = ++bID; myMutex.UnLock();
n = strlen(nameP);
if (n >= int(sizeof(uname))) n = sizeof(uname)-1;
strncpy(uname, nameP, sizeof(uname)-1);
uname[n] = 0;
linkP->setID(uname, pID);
// Indicate that this brige supports asynchronous responses
//
CapVer = kXR_asyncap | kXR_ver002;
// Mark the client as IPv4 if they came in as IPv4 or mapped IPv4
//
addrP = Link->AddrInfo();
if (addrP->isIPType(XrdNetAddrInfo::IPv4) || addrP->isMapped())
clientPV |= XrdOucEI::uIPv4;
// Mark the client as being on a private net if the address is private
//
if (addrP->isPrivate()) {clientPV |= XrdOucEI::uPrip; rdType = 1;}
else rdType = 0;
// Now tie the security information
//
Client = (seceP ? seceP : &Entity);
// Allocate a monitoring object, if needed for this connection and record login
//
if (Monitor.Ready())
{Monitor.Register(linkP->ID, linkP->Host(), protP);
if (Monitor.Logins())
{if (Monitor.Auths() && seceP) MonAuth();
else Monitor.Report(Monitor.Auths() ? "" : 0);
}
}
// Complete the request ID object
//
ReqID.setID(Request.header.streamid, linkP->FDnum(), linkP->Inst());
// Substitute our protocol for the existing one
//
realProt = linkP->setProtocol(this);
linkP->armBridge();
// Document this login
//
who = (seceP && seceP->name ? seceP->name : "nobody");
eDest.Log(SYS_LOG_01, "Bridge", Link->ID, "login as", who);
// All done, indicate we are logged in
//
Status = XRD_LOGGEDIN;
cTime = time(0);
}
/******************************************************************************/
/* P r o c e e d */
/******************************************************************************/
void XrdXrootdTransit::Proceed()
{
int rc;
// If we were interrupted in a reinvoke state, resume that state.
//
if (reInvoke) rc = Process(Link);
else rc = 0;
// Handle ending status
//
if (rc >= 0) Link->Enable();
else if (rc != -EINPROGRESS) Link->Close();
}
/******************************************************************************/
/* P r o c e s s */
/******************************************************************************/
int XrdXrootdTransit::Process(XrdLink *lp)
{
int rc;
// This entry is serialized via link processing and data is now available.
// One of the following will be returned.
//
// < 0 -> Stop getting requests,
// -EINPROGRESS leave link disabled but otherwise all is well
// -n Error, disable and close the link
// = 0 -> OK, get next request, if allowed, o/w enable the link
// > 0 -> Slow link, stop getting requests and enable the link
//
// Reflect data is present to the underlying protocol and if Run() has been
// called we need to dispatch that request. This may be iterative.
//
do{rc = realProt->Process((reInvoke ? 0 : lp));
if (rc >= 0 && runStatus)
{reInvoke = (rc == 0);
if (runError) rc = Fatal(rc);
else {runDone = false;
rc = (Resume ? XrdXrootdProtocol::Process(lp) : Process2());
if (rc >= 0)
{if (runWait) rc = -EINPROGRESS;
if (!runDone) return rc;
AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
}
}
} else reInvoke = false;
} while(rc >= 0 && reInvoke);
// Make sure that we indicate that we are no longer active
//
if (runStatus)
{AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
}
// All done
//
return (rc ? rc : 1);
}
/******************************************************************************/
/* R e c y c l e */
/******************************************************************************/
void XrdXrootdTransit::Recycle(XrdLink *lp, int consec, const char *reason)
{
// Set ourselves as active so we can't get more requests
//
AtomicBeg(runMutex);
AtomicInc(runStatus);
AtomicEnd(runMutex);
// If we were active then we will need to quiesce before dismantling ourselves.
// Note that Recycle() can only be called if the link is enabled. So, this bit
// of code is improbable but we check it anyway.
//
if (runWait > 0) Sched->Cancel(&waitJob);
// First we need to recycle the real protocol
//
if (realProt) realProt->Recycle(lp, consec, reason);
// Now we need to recycle our xrootd part
//
XrdXrootdProtocol::Recycle(lp, consec, reason);
// Release the argument buffer
//
if (runArgs) {free(runArgs); runArgs = 0;}
// Delete all pending requests
//
XrdXrootdTransPend::Clear(this);
// Now just free up our object.
//
TranStack.Push(&TranLink);
}
/******************************************************************************/
/* R e d r i v e */
/******************************************************************************/
void XrdXrootdTransit::Redrive()
{
static int eCode = htonl(kXR_NoMemory);
static char eText[] = "Insufficent memory to re-issue request";
static struct iovec ioV[] = {{(char *)&eCode,sizeof(eCode)},
{(char *)&eText,sizeof(eText)}};
int rc;
// Update wait statistics
//
runWTot += runWait;
runWait = 0;
// While we are running asynchronously, there is no way that this object can
// be deleted while a timer is outstanding as the link has been disabled. So,
// we can reissue the request with little worry.
//
if (!runALen || RunCopy(runArgs, runALen)) {
do{rc = Process2();
if (rc == 0) {
rc = realProt->Process(NULL);
}
} while((rc == 0) && !runError && !runWait);
}
else rc = Send(kXR_error, ioV, 2, 0);
// Defer the request if need be
//
if (rc >= 0 && runWait) return;
runWTot = 0;
// Indicate we are no longer active
//
if (runStatus)
{AtomicBeg(runMutex);
AtomicZAP(runStatus);
AtomicEnd(runMutex);
}
// If the link needs to be terminated, terminate the link. Otherwise, we can
// enable the link for new requests at this point.
//
if (rc < 0) Link->Close();
else Link->Enable();
}
/******************************************************************************/
/* R e q T a b l e */
/******************************************************************************/
#define KXR_INDEX(x) x-kXR_auth
const char *XrdXrootdTransit::ReqTable()
{
static char rTab[kXR_truncate-kXR_auth+1];
// Initialize the table
//
memset(rTab, 0, sizeof(rTab));
rTab[KXR_INDEX(kXR_chmod)] = 1;
rTab[KXR_INDEX(kXR_close)] = 1;
rTab[KXR_INDEX(kXR_dirlist)] = 1;
rTab[KXR_INDEX(kXR_locate)] = 1;
rTab[KXR_INDEX(kXR_mkdir)] = 1;
rTab[KXR_INDEX(kXR_mv)] = 1;
rTab[KXR_INDEX(kXR_open)] = 1;
rTab[KXR_INDEX(kXR_prepare)] = 1;
rTab[KXR_INDEX(kXR_protocol)] = 1;
rTab[KXR_INDEX(kXR_query)] = 1;
rTab[KXR_INDEX(kXR_read)] = 2;
rTab[KXR_INDEX(kXR_readv)] = 2;
rTab[KXR_INDEX(kXR_rm)] = 1;
rTab[KXR_INDEX(kXR_rmdir)] = 1;
rTab[KXR_INDEX(kXR_set)] = 1;
rTab[KXR_INDEX(kXR_stat)] = 1;
rTab[KXR_INDEX(kXR_statx)] = 1;
rTab[KXR_INDEX(kXR_sync)] = 1;
rTab[KXR_INDEX(kXR_truncate)] = 1;
rTab[KXR_INDEX(kXR_write)] = 2;
// Now return the address
//
return rTab;
}
/******************************************************************************/
/* Private: R e q W r i t e */
/******************************************************************************/
bool XrdXrootdTransit::ReqWrite(char *xdataP, int xdataL)
{
// Make sure we always transit to the resume point
//
myBlen = 0;
// If nothing was read, then this is a straight-up write
//
if (!xdataL || !xdataP || !Request.header.dlen)
{Resume = 0; wBuff = xdataP; wBLen = xdataL;
return true;
}
// Partial data was read, we may have to split this between a direct write
// and a network read/write -- somewhat complicated.
//
myBuff = wBuff = xdataP;
myBlast = wBLen = xdataL;
Resume = &XrdXrootdProtocol::do_WriteSpan;
return true;
}
/******************************************************************************/
/* R u n */
/******************************************************************************/
bool XrdXrootdTransit::Run(const char *xreqP, char *xdataP, int xdataL)
{
int movLen, rc;
// We do not allow re-entry if we are curently processing a request.
// It will be reset, as need, when a response is effected.
//
AtomicBeg(runMutex);
rc = AtomicInc(runStatus);
AtomicEnd(runMutex);
if (rc) return false;
// Copy the request header
//
memcpy((void *)&Request, (void *)xreqP, sizeof(Request));
// Validate that we can actually handle this request
//
Request.header.requestid = ntohs(Request.header.requestid);
if (Request.header.requestid & 0x8000
|| Request.header.requestid > static_cast(kXR_truncate)
|| !reqTab[Request.header.requestid - kXR_auth])
return Fail(kXR_Unsupported, "Unsupported bridge request");
// Validate the data length
//
Request.header.dlen = ntohl(Request.header.dlen);
if (Request.header.dlen < 0)
return Fail(kXR_ArgInvalid, "Invalid request data length");
// Copy the stream id and trace this request
//
Response.Set(Request.header.streamid);
TRACEP(REQ, "Bridge req=" <buff + movLen;
Resume = &XrdXrootdProtocol::Process2;
return true;
}
} else runALen = 0;
// If we have all the data, indicate request accepted.
//
runError = 0;
Resume = 0;
return true;
}
/******************************************************************************/
/* Privae: R u n C o p y */
/******************************************************************************/
bool XrdXrootdTransit::RunCopy(char *buffP, int buffL)
{
// Allocate a buffer if we do not have one or it is too small
//
if (!argp || Request.header.dlen+1 > argp->bsize)
{if (argp) BPool->Release(argp);
if (!(argp = BPool->Obtain(Request.header.dlen+1)))
{Fail(kXR_ArgTooLong, "Request argument too long"); return false;}
hcNow = hcPrev; halfBSize = argp->bsize >> 1;
}
// Copy the arguments to the buffer
//
memcpy(argp->buff, buffP, buffL);
argp->buff[buffL] = 0;
return true;
}
/******************************************************************************/
/* S e n d */
/******************************************************************************/
int XrdXrootdTransit::Send(int rcode, const struct iovec *ioV, int ioN, int ioL)
{
XrdXrootd::Bridge::Context rInfo(Link, Request.header.streamid,
Request.header.requestid);
const char *eMsg;
int rc;
bool aOK;
// Invoke the result object (we initially assume this is the final result)
//
runDone = true;
switch(rcode)
{case kXR_error:
rc = XRD_GETNUM(ioV[0].iov_base);
eMsg = (ioN < 2 ? "" : (const char *)ioV[1].iov_base);
if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
aOK = respObj->Error(rInfo, rc, eMsg);
break;
case kXR_ok:
if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
aOK = (ioN ? respObj->Data(rInfo, ioV, ioN, ioL, true)
: respObj->Done(rInfo));
break;
case kXR_oksofar:
aOK = respObj->Data(rInfo, ioV, ioN, ioL, false);
runDone = false;
break;
case kXR_redirect:
if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
rc = XRD_GETNUM(ioV[0].iov_base);
aOK = respObj->Redir(rInfo,rc,(const char *)ioV[1].iov_base);
break;
case kXR_wait:
return Wait(rInfo, ioV, ioN, ioL);
break;
case kXR_waitresp:
runDone = false;
return WaitResp(rInfo, ioV, ioN, ioL);
break;
default: if (wBuff) respObj->Free(rInfo, wBuff, wBLen);
aOK = respObj->Error(rInfo, kXR_ServerError,
"internal logic error");
break;
};
// All done
//
return (aOK ? 0 : -1);
}
/******************************************************************************/
int XrdXrootdTransit::Send(long long offset, int dlen, int fdnum)
{
XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
Request.header.requestid,
offset, dlen, fdnum);
// Effect callback (this is always a final result)
//
runDone = true;
return (respObj->File(sfInfo, dlen) ? 0 : -1);
}
/******************************************************************************/
int XrdXrootdTransit::Send(XrdOucSFVec *sfvec, int sfvnum, int dlen)
{
XrdXrootdTransSend sfInfo(Link, Request.header.streamid,
Request.header.requestid,
sfvec, sfvnum, dlen);
// Effect callback (this is always a final result)
//
runDone = true;
return (respObj->File(sfInfo, dlen) ? 0 : -1);
}
/******************************************************************************/
/* Private: W a i t */
/******************************************************************************/
int XrdXrootdTransit::Wait(XrdXrootd::Bridge::Context &rInfo,
const struct iovec *ioV, int ioN, int ioL)
{
const char *eMsg;
// Trace this request if need be
//
runWait = XRD_GETNUM(ioV[0].iov_base);
eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
// Check if the protocol wants to handle all waits
//
if (runWMax <= 0)
{int wtime = runWait;
runWait = 0;
return (respObj->Wait(rInfo, wtime, eMsg) ? 0 : -1);
}
// Check if we have exceeded the maximum wait time
//
if (runWTot >= runWMax)
{runDone = true;
runWait = 0;
return (respObj->Error(rInfo, kXR_Cancelled, eMsg) ? 0 : -1);
}
// Readjust wait time
//
if (runWait > runWMax) runWait = runWMax;
// Check if the protocol wants a wait notification
//
if (runWCall && !(respObj->Wait(rInfo, runWait, eMsg))) return -1;
// All done, schedule the wait
//
TRACEP(REQ, "Bridge delaying request " <Schedule((XrdJob *)&waitJob, time(0)+runWait);
return 0;
}
/******************************************************************************/
/* Private: W a i t R e s p */
/******************************************************************************/
int XrdXrootdTransit::WaitResp(XrdXrootd::Bridge::Context &rInfo,
const struct iovec *ioV, int ioN, int ioL)
{
XrdXrootdTransPend *trP;
const char *eMsg;
int wTime;
// Trace this request if need be
//
wTime = XRD_GETNUM(ioV[0].iov_base);
eMsg = (ioN < 2 ? "reason unknown" : (const char *)ioV[1].iov_base);
TRACEP(REQ, "Bridge waiting for resp; sid=" <WaitResp(rInfo, runWait, eMsg);
// Save the current state
//
trP = new XrdXrootdTransPend(Link, this, &Request);
trP->Queue();
// Effect a wait
//
runWait = -1;
return 0;
}