/******************************************************************************/ /* */ /* X r d S s i F i l e S e s s . c c */ /* */ /* (c) 2016 by the Board of Trustees of the Leland Stanford, Jr., University */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include #include #include #include "XrdNet/XrdNetAddrInfo.hh" #include "XrdOuc/XrdOucBuffer.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucERoute.hh" #include "XrdOuc/XrdOucPList.hh" #include "XrdSec/XrdSecEntity.hh" #include "XrdSfs/XrdSfsAio.hh" #include "XrdSfs/XrdSfsXio.hh" #include "XrdSsi/XrdSsiEntity.hh" #include "XrdSsi/XrdSsiFileSess.hh" #include "XrdSsi/XrdSsiProvider.hh" #include "XrdSsi/XrdSsiRRInfo.hh" #include "XrdSsi/XrdSsiService.hh" #include "XrdSsi/XrdSsiSfs.hh" #include "XrdSsi/XrdSsiStats.hh" #include "XrdSsi/XrdSsiStream.hh" #include "XrdSsi/XrdSsiTrace.hh" #include "XrdSsi/XrdSsiUtils.hh" #include "XrdSys/XrdSysError.hh" /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ namespace XrdSsi { extern XrdOucBuffPool *BuffPool; extern XrdSsiProvider *Provider; extern XrdSsiService *Service; extern XrdSsiStats Stats; extern XrdSysError Log; extern int respWT; }; using namespace XrdSsi; /******************************************************************************/ /* L o c a l M a c r o s */ /******************************************************************************/ #define DUMPIT(x,y) XrdSsiUtils::b2x(x,y,hexBuff,sizeof(hexBuff),dotBuff)<nextFree; arMutex.UnLock(); fsP->Init(einfo, user, true); } else { freeNew++; if (freeMax <= freeAbs && freeNew >= freeMax/2) {freeMax += freeMax/2; freeNew = 0; } arMutex.UnLock(); fsP = new XrdSsiFileSess(einfo, user); } // Return the object // return fsP; } /******************************************************************************/ /* A t t n I n f o */ /******************************************************************************/ bool XrdSsiFileSess::AttnInfo(XrdOucErrInfo &eInfo, const XrdSsiRespInfo *respP, unsigned int reqID) // Called with the request mutex locked! { EPNAME("AttnInfo"); struct AttnResp {struct iovec ioV[4]; XrdSsiRRInfoAttn aHdr;}; AttnResp *attnResp; char *mBuff; int n, ioN = 2; bool doFin; // If there is no data we can send back to the client in the attn response, // then simply reply with a short message to make the client come back. // if (!respP->mdlen) {if (respP->rType != XrdSsiRespInfo::isData || respP->blen > XrdSsiResponder::MaxDirectXfr) {eInfo.setErrInfo(0, ""); return false; } } // We will be constructing the response in the message buffer. This is // gauranteed to be big enough for our purposes so no need to check the size. // mBuff = eInfo.getMsgBuff(n); // Initialize the response // attnResp = (AttnResp *)mBuff; memset(attnResp, 0, sizeof(AttnResp)); attnResp->aHdr.pfxLen = htons(sizeof(XrdSsiRRInfoAttn)); // Fill out iovec to point to our header // //?attnResp->ioV[0].iov_len = sizeof(XrdSsiRRInfoAttn) + respP->mdlen; attnResp->ioV[1].iov_base = mBuff+offsetof(struct AttnResp, aHdr); attnResp->ioV[1].iov_len = sizeof(XrdSsiRRInfoAttn); // Fill out the iovec for the metadata if we have some // if (respP->mdlen) {attnResp->ioV[2].iov_base = (void *)respP->mdata; attnResp->ioV[2].iov_len = respP->mdlen; ioN = 3; attnResp->aHdr.mdLen = htonl(respP->mdlen); Stats.Bump(Stats.RspMDBytes, respP->mdlen); if (QTRACE(Debug)) {char hexBuff[16],dotBuff[4]; DEBUG(reqID <<':' <mdlen <<" byte metadata (0x" <mdata,respP->mdlen) <<") sent."); } } // Check if we have actual data here as well and can send it along // if (respP->rType == XrdSsiRespInfo::isData && respP->blen+respP->mdlen <= XrdSsiResponder::MaxDirectXfr) {if (respP->blen) {attnResp->ioV[ioN].iov_base = (void *)respP->buff; attnResp->ioV[ioN].iov_len = respP->blen; ioN++; } attnResp->aHdr.tag = XrdSsiRRInfoAttn::fullResp; doFin = true; } else {attnResp->aHdr.tag = XrdSsiRRInfoAttn::pendResp; doFin = false;} // If we sent the full response we must remove the request from the request // table as it will get finished off when the response is actually sent. // if (doFin) rTab.Del(reqID, false); // Setup to have metadata actually sent to the requestor // eInfo.setErrCode(ioN); return doFin; } /******************************************************************************/ /* c l o s e */ /******************************************************************************/ int XrdSsiFileSess::close(bool viaDel) /* Function: Close the file object. Input: None Output: Always returns SFS_OK */ { const char *epname = "close"; // Do some debugging // DEBUG((gigID ? gigID : "???") <<" del=" <Recycle(); oucBuff = 0;} inProg = false; } // Clean up storage // isOpen = false; return SFS_OK; } /******************************************************************************/ /* f c t l */ /******************************************************************************/ int XrdSsiFileSess::fctl(const int cmd, int alen, const char *args, const XrdSecEntity *client) { static const char *epname = "fctl"; XrdSsiRRInfo *rInfo; XrdSsiFileReq *rqstP; unsigned int reqID; // If this isn't the special query, then return an error // if (cmd != SFS_FCTL_SPEC1) return XrdSsiUtils::Emsg(epname, ENOTSUP, "fctl", gigID, *eInfo); // Caller wishes to find out if a request is ready and wait if it is not // if (!args || alen < (int)sizeof(XrdSsiRRInfo)) return XrdSsiUtils::Emsg(epname, EINVAL, "fctl", gigID, *eInfo); // Grab the request identifier // rInfo = (XrdSsiRRInfo *)args; reqID = rInfo->Id(); // Do some debugging // DEBUG(reqID <<':' <WantResponse(*eInfo)) {DEBUG(reqID <<':' <setErrCB((XrdOucEICB *)rqstP); eInfo->setErrInfo(respWT, ""); Stats.Bump(Stats.RspUnRdy); return SFS_STARTED; } /******************************************************************************/ /* Private: I n i t */ /******************************************************************************/ void XrdSsiFileSess::Init(XrdOucErrInfo &einfo, const char *user, bool forReuse) { tident = (user ? strdup(user) : strdup("")); eInfo = &einfo; gigID = 0; fsUser = 0; xioP = 0; oucBuff = 0; reqSize = 0; reqLeft = 0; isOpen = false; inProg = false; if (forReuse) {eofVec.Reset(); rTab.Clear(); } } /******************************************************************************/ /* Private: N e w R e q u e s t */ /******************************************************************************/ bool XrdSsiFileSess::NewRequest(unsigned int reqid, XrdOucBuffer *oP, XrdSfsXioHandle *bR, int rSz) { XrdSsiFileReq *reqP; // Allocate a new request object // if (!(reqP=XrdSsiFileReq::Alloc(eInfo,&fileResource,this,gigID,tident,reqid))) return false; // Add it to the table // rTab.Add(reqP, reqid); // Activate the request // inProg = false; reqP->Activate(oP, bR, rSz); return true; } /******************************************************************************/ /* o p e n */ /******************************************************************************/ int XrdSsiFileSess::open(const char *path, // In XrdOucEnv &theEnv, // In XrdSfsFileOpenMode open_mode) // In /* Function: Open the file `path' in the mode indicated by `open_mode'. Input: path - The fully qualified name of the resource. theEnv - Environmental information. open_mode - It must contain only SFS_O_RDWR. Output: Returns SFS_OK upon success, otherwise SFS_ERROR is returned. */ { static const char *epname = "open"; XrdSsiErrInfo errInfo; const char *eText; int eNum; // Verify that this object is not already associated with an open file // if (isOpen) return XrdSsiUtils::Emsg(epname, EADDRINUSE, "open session", path, *eInfo); // Make sure the open flag is correct (we now open this R/O so don't check) // // if (open_mode != SFS_O_RDWR) // return XrdSsiUtils::Emsg(epname, EPROTOTYPE, "open session", path, *eInfo); // Setup the file resource object // fileResource.Init(path, theEnv, authDNS); // Notify the provider that we will be executing a request // if (Service->Prepare(errInfo, fileResource)) {const char *usr = fileResource.rUser.c_str(); if (!(*usr)) gigID = strdup(path); else {char gBuff[2048]; snprintf(gBuff, sizeof(gBuff), "%s:%s", usr, path); gigID = strdup(gBuff); } DEBUG(gigID <<" prepared."); isOpen = true; return SFS_OK; } // Get error information // eText = errInfo.Get(eNum).c_str(); if (!eNum) {eNum = ENOMSG; eText = "Provider returned invalid prepare response.";} // Decode the error // switch(eNum) {case EAGAIN: if (!eText || !(*eText)) break; eNum = errInfo.GetArg(); DEBUG(path <<" --> " <setErrInfo(eNum, eText); Stats.Bump(Stats.ReqRedir); return SFS_REDIRECT; break; case EBUSY: eNum = errInfo.GetArg(); if (!eText || !(*eText)) eText = "Provider is busy."; DEBUG(path <<" dly " <setErrInfo(eNum, eText); Stats.Bump(Stats.ReqStalls); return eNum; break; default: if (!eText || !(*eText)) eText = strerror(eNum); DEBUG(path <<" err " <setErrInfo(eNum, eText); Stats.Bump(Stats.ReqPrepErrs); return SFS_ERROR; break; }; // Something is quite wrong here // Log.Emsg(epname, "Provider redirect returned no target host name!"); eInfo->setErrInfo(ENOMSG, "Server logic error"); Stats.Bump(Stats.ReqPrepErrs); return SFS_ERROR; } /******************************************************************************/ /* r e a d */ /******************************************************************************/ XrdSfsXferSize XrdSsiFileSess::read(XrdSfsFileOffset offset, // In char *buff, // Out XrdSfsXferSize blen) // In /* Function: Read `blen' bytes at `offset' into 'buff' and return the actual number of bytes read. Input: offset - Contains request information. buff - Address of the buffer in which to place the data. blen - The size of the buffer. This is the maximum number of bytes that will be returned. Output: Returns the number of bytes read upon success and SFS_ERROR o/w. */ { static const char *epname = "read"; XrdSsiRRInfo rInfo(offset); XrdSsiFileReq *rqstP; XrdSfsXferSize retval; unsigned int reqID = rInfo.Id(); bool noMore = false; // Find the request object. If not there we may have encountered an eof // if (!(rqstP = rTab.LookUp(reqID))) {if (eofVec.IsSet(reqID)) {eofVec.UnSet(reqID); return 0; } return XrdSsiUtils::Emsg(epname, ESRCH, "read", gigID, *eInfo); } // Simply effect the read via the request object // retval = rqstP->Read(noMore, buff, blen); // See if we just completed this request // if (noMore) {rqstP->Finalize(); rTab.Del(reqID); eofVec.Set(reqID); } // All done // return retval; } /******************************************************************************/ /* R e c y c l e */ /******************************************************************************/ void XrdSsiFileSess::Recycle() { // Do an immediate reset on ourselves to avoid getting too many locks // Reset(); // Get a lock // arMutex.Lock(); // Check if we should place this on the free list or simply delete it // if (freeNum < freeMax) {nextFree = freeList; freeList = this; freeNum++; arMutex.UnLock(); } else { arMutex.UnLock(); delete this; } } /******************************************************************************/ /* Private: R e s e t */ /******************************************************************************/ void XrdSsiFileSess::Reset() { // Close this session // if (isOpen) close(true); // Release other buffers // if (tident) free(tident); if (fsUser) free(fsUser); if (gigID) free(gigID); } /******************************************************************************/ /* S e n d D a t a */ /******************************************************************************/ int XrdSsiFileSess::SendData(XrdSfsDio *sfDio, XrdSfsFileOffset offset, XrdSfsXferSize size) { static const char *epname = "SendData"; XrdSsiRRInfo rInfo(offset); XrdSsiFileReq *rqstP; unsigned int reqID = rInfo.Id(); int rc; // Find the request object // if (!(rqstP = rTab.LookUp(reqID))) return XrdSsiUtils::Emsg(epname, ESRCH, "send", gigID, *eInfo); // Simply effect the send via the request object // rc = rqstP->Send(sfDio, size); // Determine how this ended // if (rc > 0) rc = SFS_OK; else {rqstP->Finalize(); rTab.Del(reqID); } return rc; } /******************************************************************************/ /* t r u n c a t e */ /******************************************************************************/ int XrdSsiFileSess::truncate(XrdSfsFileOffset flen) // In /* Function: Set the length of the file object to 'flen' bytes. Input: flen - The new size of the file. Output: Returns SFS_ERROR a this function is not supported. */ { static const char *epname = "trunc"; XrdSsiFileReq *rqstP; XrdSsiRRInfo rInfo(flen); XrdSsiRRInfo::Opc reqXQ = rInfo.Cmd(); unsigned int reqID = rInfo.Id(); // Find the request object. If not there we may have encountered an eof // if (!(rqstP = rTab.LookUp(reqID))) {if (eofVec.IsSet(reqID)) {eofVec.UnSet(reqID); return 0; } return XrdSsiUtils::Emsg(epname, ESRCH, "cancel", gigID, *eInfo); } // Process request (this can only be a cancel request) // if (reqXQ != XrdSsiRRInfo::Can) return XrdSsiUtils::Emsg(epname, ENOSYS, "trunc", gigID, *eInfo); // Perform the cancellation // DEBUG(reqID <<':' <Finalize(); rTab.Del(reqID); return SFS_OK; } /******************************************************************************/ /* w r i t e */ /******************************************************************************/ XrdSfsXferSize XrdSsiFileSess::write(XrdSfsFileOffset offset, // In const char *buff, // In XrdSfsXferSize blen) // In /* Function: Write `blen' bytes at `offset' from 'buff' and return the actual number of bytes written. Input: offset - The absolute byte offset at which to start the write. buff - Address of the buffer from which to get the data. blen - The size of the buffer. This is the maximum number of bytes that will be written to 'fd'. Output: Returns the number of bytes written upon success and SFS_ERROR o/w. Notes: An error return may be delayed until the next write(), close(), or sync() call. */ { static const char *epname = "write"; XrdSsiRRInfo rInfo(offset); unsigned int reqID = rInfo.Id(); int reqPass; // Check if we are reading a request segment and handle that. This assumes that // writes to different requests cannot be interleaved (which they can't be). // if (inProg) return writeAdd(buff, blen, reqID); // Make sure this request does not refer to an active request // if (rTab.LookUp(reqID)) return XrdSsiUtils::Emsg(epname, EADDRINUSE, "write", gigID, *eInfo); // The offset contains the actual size of the request, make sure it's OK. Note // that it can be zero and by convention the blen must be one if so. // reqPass = reqSize = rInfo.Size(); if (reqSize < blen) {if (reqSize || blen != 1) return XrdSsiUtils::Emsg(epname, EPROTO, "write", gigID, *eInfo); reqSize = 1; } else if (reqSize < 0 || reqSize > maxRSZ) return XrdSsiUtils::Emsg(epname, EFBIG, "write", gigID, *eInfo); // Indicate we are in the progress of collecting the request arguments // inProg = true; eofVec.UnSet(reqID); // Do some debugging // DEBUG(reqID <<':' <Alloc(reqSize))) return XrdSsiUtils::Emsg(epname, ENOMEM, "write", gigID, *eInfo); // Setup to buffer this // reqLeft = reqSize - blen; memcpy(oucBuff->Data(), buff, blen); if (!reqLeft) {oucBuff->SetLen(reqSize); if (!NewRequest(reqID, oucBuff, 0, reqPass)) return XrdSsiUtils::Emsg(epname, ENOMEM, "write", gigID, *eInfo); oucBuff = 0; } else oucBuff->SetLen(blen, blen); return blen; } /******************************************************************************/ /* Private: w r i t e A d d */ /******************************************************************************/ XrdSfsXferSize XrdSsiFileSess::writeAdd(const char *buff, // In XrdSfsXferSize blen, // In unsigned int rid) /* Function: Add `blen' bytes from 'buff' to request and return the actual number of bytes added. Input: buff - Address of the buffer from which to get the data. blen - The size of the buffer. This is the maximum number of bytes that will be added. Output: Returns the number of bytes added upon success and SFS_ERROR o/w. Notes: An error return may be delayed until the next write(), close(), or sync() call. */ { static const char *epname = "writeAdd"; int dlen; // Make sure the caller is not exceeding the size stated on the first write // if (blen > reqLeft) return XrdSsiUtils::Emsg(epname, EFBIG, "writeAdd", gigID, *eInfo); // Append the bytes // memcpy(oucBuff->Data(dlen), buff, blen); // Adjust how much we have left // reqLeft -= blen; DEBUG(rid <<':' <SetLen(dlen, dlen); return blen; }