/******************************************************************************/ /* */ /* X r d X r o o t d A i o . c c */ /* */ /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include "Xrd/XrdBuffer.hh" #include "Xrd/XrdLink.hh" #include "XProtocol/XProtocol.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdSfs/XrdSfsInterface.hh" #include "XrdXrootd/XrdXrootdAio.hh" #include "XrdXrootd/XrdXrootdFile.hh" #include "XrdXrootd/XrdXrootdProtocol.hh" #include "XrdXrootd/XrdXrootdStats.hh" #include "XrdXrootd/XrdXrootdTrace.hh" /******************************************************************************/ /* S t a t i c O b j e c t s */ /******************************************************************************/ XrdBuffManager *XrdXrootdAio::BPool; XrdScheduler *XrdXrootdAio::Sched; XrdXrootdStats *XrdXrootdAio::SI; XrdSysMutex XrdXrootdAio::fqMutex; XrdXrootdAio *XrdXrootdAio::fqFirst = 0; const char *XrdXrootdAio::TraceID = "Aio"; int XrdXrootdAio::maxAio; XrdSysError *XrdXrootdAioReq::eDest; XrdSysMutex XrdXrootdAioReq::rqMutex; XrdXrootdAioReq *XrdXrootdAioReq::rqFirst = 0; const char *XrdXrootdAioReq::TraceID = "AioReq"; int XrdXrootdAioReq::QuantumMin; int XrdXrootdAioReq::Quantum; int XrdXrootdAioReq::QuantumMax; int XrdXrootdAioReq::maxAioPR = 8; int XrdXrootdAioReq::maxAioPR2 =16; extern XrdOucTrace *XrdXrootdTrace; /******************************************************************************/ /* X r d X r o o t d A i o : : A l l o c */ /******************************************************************************/ XrdXrootdAio *XrdXrootdAio::Alloc(XrdXrootdAioReq *arp, int bsize) { XrdXrootdAio *aiop; // Obtain an aio object // fqMutex.Lock(); if ((aiop = fqFirst)) fqFirst = aiop->Next; else if (maxAio) aiop = addBlock(); if (aiop && (++(SI->AsyncNow) > SI->AsyncMax)) SI->AsyncMax = SI->AsyncNow; fqMutex.UnLock(); // Allocate a buffer for this object // if (aiop) {if (bsize && (aiop->buffp = BPool->Obtain(bsize))) {aiop->sfsAio.aio_buf = (void *)(aiop->buffp->buff); aiop->aioReq = arp; aiop->TIdent = arp->Link->ID; } else {aiop->Recycle(); aiop = 0;} } // Return what we have // return aiop; } /******************************************************************************/ /* X r d X r o o t d A i o : : d o n e R e a d */ /******************************************************************************/ // Aio read requests are double buffered. So, there is only one aiocb active // at a time. This is done for two reasons: // 1) Provide a serial stream to the client, and // 2) avoid swamping the network adapter. // Additionally, double buffering requires minimal locking and simplifies the // redrive logic. While this knowledge violates OO design, it substantially // speeds up async I/O handling. This method is called out of the async event // handler so it does very little work. void XrdXrootdAio::doneRead() { // Plase this aio request on the completed queue // aioReq->aioDone = this; // Extract out any error conditions (keep only the first one) // if (Result >= 0) aioReq->aioTotal += Result; else if (!aioReq->aioError) aioReq->aioError = Result; // Schedule the associated arp to redrive the I/O // Sched->Schedule((XrdJob *)aioReq); } /******************************************************************************/ /* X r d X r o o t d A i o : : d o n e W r i t e */ /******************************************************************************/ // Writes are more complicated because there may be several in transit. This // is done to keep the client from swamping the network adapter. We try // to optimize the handling of the aio object for the common cases. This method // is called out of the async event handler so it does very little work. void XrdXrootdAio::doneWrite() { char recycle = 0; // Lock the aioreq object against competition // aioReq->Lock(); aioReq->numActive--; // Extract out any error conditions (keep only the first one). // if (Result >= 0) {aioReq->myIOLen -= Result; aioReq->aioTotal += Result; } else if (!aioReq->aioError) aioReq->aioError = Result; // Redrive the protocol if so requested. It is impossible to have a proocol // redrive and completed all of the I/O at the same time. // if (aioReq->reDrive) {Sched->Schedule((XrdJob *)aioReq->Link); aioReq->reDrive = 0; } // If more aio objects are needed, place this one on the free queue. Otherwise, // schedule the AioReq object to complete handling the request if no more // requests are outstanding. It is impossible to have a zero length with more // requests outstanding. // //cerr <<"doneWrite left " <numActive <<' ' <myIOLen <myIOLen > 0) {Next = aioReq->aioFree; aioReq->aioFree = this;} else {if (!(aioReq->numActive)) Sched->Schedule((XrdJob *)aioReq); recycle = 1; } // All done, perform early recycling if possible // aioReq->UnLock(); if (recycle) Recycle(); } /******************************************************************************/ /* X r d X r o o t d A i o : : R e c y c l e */ /******************************************************************************/ void XrdXrootdAio::Recycle() { // Recycle the buffer // if (buffp) {BPool->Release(buffp); buffp = 0;} // Add this object to the free queue // fqMutex.Lock(); Next = fqFirst; fqFirst = this; if (--(SI->AsyncNow) < 0) SI->AsyncNow=0; fqMutex.UnLock(); } /******************************************************************************/ /* P r i v a t e M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* X r d X r o o t d A i o : : a d d B l o c k */ /******************************************************************************/ XrdXrootdAio *XrdXrootdAio::addBlock() { const int numalloc = 4096/sizeof(XrdXrootdAio); int i = (numalloc <= maxAio ? numalloc : maxAio); XrdXrootdAio *aiop; TRACE(DEBUG, "Adding " <Next = fqFirst; fqFirst = aiop; aiop++;} } return aiop; } /******************************************************************************/ /* X r d X r o o t d A i o R e q */ /******************************************************************************/ /******************************************************************************/ /* X r d X r o o t d A i o R e q : : A l l o c */ /******************************************************************************/ // Implicit Parameters: prot->myIOLen // Length of i/o request // prot->myOffset // Starting offset // prot->myFile // Target file // prot->Link // Link object // prot->response // Response object XrdXrootdAioReq *XrdXrootdAioReq::Alloc(XrdXrootdProtocol *prot, char iotype, int numaio) { int i, cntaio, myQuantum, iolen = prot->myIOLen; XrdXrootdAioReq *arp; XrdXrootdAio *aiop; // Obtain an aioreq object // rqMutex.Lock(); if ((arp = rqFirst)) rqFirst = arp->Next; else arp = addBlock(); rqMutex.UnLock(); // Make sure we have one, fully reset it if we do // if (!arp) return arp; arp->Clear(prot->Link); if (!numaio) numaio = maxAioPR; // Compute the number of aio objects should get and the Quantum size we should // use. This is a delicate balancing act. We don't want too many segments but // neither do we want too large of an i/o size. So, if the i/o size is less than // the quantum then use half a quantum. If the number of segments is greater // than twice what we would like, then use a larger quantum size. // if (iolen < Quantum) {myQuantum = QuantumMin; if (!(cntaio = iolen / myQuantum)) cntaio = 1; else if (iolen % myQuantum) cntaio++; } else {cntaio = iolen / Quantum; if (cntaio <= maxAioPR2) myQuantum = Quantum; else {myQuantum = QuantumMax; cntaio = iolen / myQuantum; } if (iolen % myQuantum) cntaio++; } // Get appropriate number of aio objects // i = (maxAioPR < cntaio ? maxAioPR : cntaio); while(i && (aiop = XrdXrootdAio::Alloc(arp, myQuantum))) {aiop->Next = arp->aioFree; arp->aioFree = aiop; i--;} // Make sure we have at least the minimum number of aio objects // if (i && (maxAioPR - i) < 2 && cntaio > 1) {arp->Recycle(0); return (XrdXrootdAioReq *)0;} // Complete the request information // if (iotype != 'w') prot->Link->setRef(1); arp->Instance = prot->Link->Inst(); arp->myIOLen = iolen; // Amount that is left to send arp->myOffset = prot->myOffset; arp->myFile = prot->myFile; arp->Response = prot->Response; arp->aioType = iotype; // Return what we have // return arp; } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : g e t A i o */ /******************************************************************************/ XrdXrootdAio *XrdXrootdAioReq::getAio() { XrdXrootdAio *aiop; // Grab the next free aio object. If none, we return a null pointer. While this // is a classic consumer/producer problem, normally handled by a semaphore, // doing so would cause more threads to be tied up as the load increases. We // want the opposite effect for scaling purposes. So, we use a redrive scheme. // Lock(); if ((aiop = aioFree)) {aioFree = aiop->Next; aiop->Next = 0;} else reDrive = 1; UnLock(); return aiop; } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : I n i t */ /******************************************************************************/ void XrdXrootdAioReq::Init(int iosize, int maxaiopr, int maxaio) { XrdXrootdAio *aiop; XrdXrootdAioReq *arp; // Set the pointer to the buffer pool, scheduler and statistical area, these are // only used by the Aio object // XrdXrootdAio::Sched = XrdXrootdProtocol::Sched; XrdXrootdAio::BPool = XrdXrootdProtocol::BPool; XrdXrootdAio::SI = XrdXrootdProtocol::SI; // Set the pointer to the error object and compute the limits // eDest = &XrdXrootdProtocol::eDest; Quantum = static_cast(iosize); QuantumMin = Quantum / 2; QuantumMax = Quantum * 2; if (QuantumMax > XrdXrootdProtocol::maxBuffsz) QuantumMax = XrdXrootdProtocol::maxBuffsz; // Set the maximum number of aio objects we can have (used by Aio object only) // Note that sysconf(_SC_AIO_MAX) usually provides an unreliable number if it // provides a number at all. // maxAioPR = (maxaiopr < 1 ? 8 : maxaiopr); maxAioPR2 = maxAioPR * 2; XrdXrootdAio::maxAio = (maxaio < maxAioPR ? maxAioPR : maxaio); // Do some debuging // TRACE(DEBUG, "Max aio/req=" <Next = rqFirst; rqFirst = arp; arp++;} return arp; } /******************************************************************************/ /* C l e a r */ /******************************************************************************/ void XrdXrootdAioReq::Clear(XrdLink *lnkp) { Next = 0; myOffset = 0; myIOLen = 0; Instance = 0; Link = lnkp; myFile = 0; aioDone = 0; aioFree = 0; numActive = 0; aioTotal = 0; aioError = 0; aioType = 0; respDone = 0; isLocked = 0; reDrive = 0; } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : e n d R e a d */ /******************************************************************************/ void XrdXrootdAioReq::endRead() { XrdXrootdAio *aiop; int rc; // For read requests, schedule the next read request and send the data we // already have. Since we don't know if that read will complete before we // can send the data of the just completed read, we must lock the AioReq. // We do know that if we have the lock, absolutely nothing is in transit. // Lock(); numActive--; // Do a sanity check. The link should not have changed hands but stranger // things have happened. // if (!(Link->isInstance(Instance))) {Scuttle("aio read"); return;} // Dequeue the completed request (we know we're just double buffered but the // queueing is structured so this works even we're n-buffered. // aiop = aioDone; aioDone = aiop->Next; // If we encountered an error, send off the error message now and terminate // if (aioError || (myIOLen > 0 && aiop->Result == aiop->buffp->bsize && (aioError=Read()))) {sendError((char *)aiop->TIdent); Recycle(1, aiop); return; } // We may or may not have an I/O request in flight. However, send off // whatever data we have at this point. // rc = (numActive ? Response.Send(kXR_oksofar, aiop->buffp->buff, aiop->Result) : Response.Send( aiop->buffp->buff, aiop->Result)); // Stop the operation if no I/O is in flight. Make the request stop-pending if // we could not send the data to the client. // if (!numActive) {myFile->Stats.rdOps(aioTotal); Recycle(1, aiop); } else {aiop->Next = aioFree, aioFree = aiop; if (rc < 0) {aioError = -1; respDone = 1;} UnLock(); } } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : e n d W r i t e */ /******************************************************************************/ void XrdXrootdAioReq::endWrite() { // For write requests, this method is called when all of the I/O has completed // There is no need to lock this object since nothing is pending. In any case, // Do a sanity check. The link should not have changed hands but stranger // things have happened. // if (!(Link->isInstance(Instance))) {Scuttle("aio write"); return;} // If we encountered an error, send off the error message else indicate all OK // if (aioError) sendError(Link->ID); else Response.Send(); // Add in the bytes written. This is approzimate because it is done without // obtaining any kind of lock. Fortunately, it only statistical in nature. // myFile->Stats.wrOps(aioTotal); // We are done, simply recycle ouselves. // Recycle(); } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : S c u t t l e */ /******************************************************************************/ void XrdXrootdAioReq::Scuttle(const char *opname) { // Log this event. We can't trust much of anything at this point. // eDest->Emsg("scuttle",opname,"failed; link reassigned to",Link->ID); // We can just recycle ourselves at this point since we know we are in a // transition window where nothing is active w.r.t. this request. // Recycle(0); } /******************************************************************************/ /* X r d X r o o t d A i o R e q : : s e n d E r r o r */ /******************************************************************************/ // Warning! The caller must have appropriately serialized the use of this method void XrdXrootdAioReq::sendError(char *tident) { char mbuff[4096]; int rc; // If a response was sent, don't send one again // if (respDone) return; respDone = 1; // Generate message text. We can't rely on the sfs interface to do this since // that interface is synchronous. // snprintf(mbuff, sizeof(mbuff)-1, "XrdXrootdAio: Unable to %s %s; %s", (aioType == 'r' ? "read" : "write"), myFile->XrdSfsp->FName(), eDest->ec2text(aioError)); // Please the error message in the log // eDest->Emsg("aio", tident, mbuff); // Remap the error from the filesystem // rc = XProtocol::mapError(aioError); // Send the erro back to the client (ignore any errors) // Response.Send((XErrorCode)rc, mbuff); }