/******************************************************************************/
/* */
/* X r d X r o o t d X e q A i o . c c */
/* */
/* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include "Xrd/XrdBuffer.hh"
#include "Xrd/XrdLink.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "XrdSfs/XrdSfsInterface.hh"
#include "XrdXrootd/XrdXrootdAio.hh"
#include "XrdXrootd/XrdXrootdFile.hh"
#include "XrdXrootd/XrdXrootdProtocol.hh"
#include "XrdXrootd/XrdXrootdTrace.hh"
/******************************************************************************/
/* G l o b a l s */
/******************************************************************************/
extern XrdOucTrace *XrdXrootdTrace;
/******************************************************************************/
/* a i o _ E r r o r */
/******************************************************************************/
int XrdXrootdProtocol::aio_Error(const char *op, int ecode)
{
char *etext, buffer[MAXPATHLEN+80], unkbuff[64];
// Get the reason for the error
//
if (!(etext = eDest.ec2text(ecode)))
{sprintf(unkbuff, "reason unknown (%d)", ecode); etext = unkbuff;}
// Format the error message
//
snprintf(buffer,sizeof(buffer),"Unable to %s %s; %s",
op, myFile->XrdSfsp->FName(), etext);
// Print it out if debugging is enabled
//
#ifndef NODEBUG
eDest.Emsg("aio_Error", Link->ID, buffer);
#endif
// Place the error message in the error object and return
//
myFile->XrdSfsp->error.setErrInfo(ecode, buffer);
// Prepare for recovery
//
myAioReq = 0;
return -EIO;
}
/******************************************************************************/
/* a i o _ R e a d */
/******************************************************************************/
// Implied Arguments:
// myFile = file to be read
// myOffset = Offset at which to read
// myIOLen = Number of bytes to read from file and write to socket
// Returns:
// >0 -> n/a
// =0 -> OK to continue with next operation.
// -EAGAIN -> Revert to synchronous I/O
// <0 -> Error, close link.
int XrdXrootdProtocol::aio_Read()
{
XrdXrootdAioReq *arp;
// Allocate a request object to handle this request and fire off the first
// i/o (they are self-sustaining after that). Any errors at this point will
// force us to revert to synchronous i/o.
//
if (!(arp=XrdXrootdAioReq::Alloc(this,'r',2)) || arp->Read()) return -EAGAIN;
// All done
//
return 0;
}
/******************************************************************************/
/* a i o _ W r i t e */
/******************************************************************************/
// Implied Arguments:
// myFile = file to be read
// myOffset = Offset at which to read
// myIOLen = Number of bytes to read from file and write to socket
// myStalls = Number of stalls encountered last time we did I/O
// Returns:
// >0 -> Slow link, enable link and wait for more data.
// =0 -> OK to continue with next operation.
// -EAGAIN -> Revert to synchronous I/O
// -EINPROGRESS -> Ran out of aio objects, leave link disabled
// -EIO -> File system error, flush link.
// <0 -> Error, close link.
int XrdXrootdProtocol::aio_Write()
{
// Allocate a request object to handle this request
//
if (!(myAioReq = XrdXrootdAioReq::Alloc(this, 'w'))) return -EAGAIN;
// Since the socket is synchronous in delivering data to write; only one
// write async request can occur at one time, though several may be in-flight
// after we drain the socket of data. While draining, we remember the AioReq
// object in case we must suspend operations and start the flow.
//
return aio_WriteAll();
}
/******************************************************************************/
/* a i o _ W r i t e A l l */
/******************************************************************************/
// myFile = file to be read
// myOffset = Offset at which to read
// myIOLen = Number of bytes to read from file and write to socket
// myAioReq = -> Aio Request
// The steps taken are:
// 1) Obtain an aio object. If none available, a redrive will be scheduled for
// the protocol and we return -EINPROGRESS which will keep the link disabled.
// 2) Read the data from the link into the buffer using getData().
// 3) If the link is slow, return a 1 which will re-enable the link and
// redrive the protocol when data is available. We will resume in
// aio_WriteCont() when the buffer has the required amount of data.
// 4) If the read from the link indicated an error then abort the operation
// by recycling the AioReq object which will synchronize in-flight i/o.
// 5) Schedule the aio write. Errors will scuttle the operation and proceed to
// flush the socket. The write() call will appropriately recycle the AioReq
// object. We note that no error should be returned if aio resources are
// exhausted, the underlying implementation must revert to synchronous
// handling. That's a lot of overhead but we'll back off.
int XrdXrootdProtocol::aio_WriteAll()
{
XrdXrootdAio *aiop;
size_t Quantum;
int rc = 0;
if (myStalls) myStalls--;
while (myIOLen > 0)
/*1*/ {if (!(aiop = myAioReq->getAio()))
{Resume = &XrdXrootdProtocol::aio_WriteAll;
myBlen = 0;
return -EINPROGRESS;
}
/*2*/ Quantum = (aiop->buffp->bsize > myIOLen ? myIOLen
: aiop->buffp->bsize);
if ((rc = getData("aiodata", aiop->buffp->buff, Quantum)))
/*3*/ {if (rc > 0)
{Resume = &XrdXrootdProtocol::aio_WriteCont;
myBlast = Quantum;
myAioReq->Push(aiop);
myStalls++;
return 1;
}
/*4*/ myAioReq->Recycle(-1, aiop);
break;
}
/*5*/ aiop->sfsAio.aio_nbytes = Quantum;
aiop->sfsAio.aio_offset = myOffset;
myIOLen -= Quantum; myOffset += Quantum;
if ((rc = myAioReq->Write(aiop))) return aio_Error("write", rc);
}
// We have completed
//
if (myStalls <= as_maxstalls) myStalls = 0;
myAioReq = 0;
Resume = 0;
return rc;
}
/******************************************************************************/
/* a i o _ W r i t e C o n t */
/******************************************************************************/
// myFile = file to be written
// myOffset = Offset at which to write
// myIOLen = Number of bytes to read from socket and write to file
// myBlast = Number of bytes already read from the socket
// myAio = Pointer to the XrdXrootdAioReq object.
int XrdXrootdProtocol::aio_WriteCont()
{
XrdXrootdAio *aiop = myAioReq->Pop();
int rc;
// Write data that was finaly finished comming in. Note that we could simply
// pick up the current aio object without locks since this is synchronized
// via protocol object scheduling (only one can occur at a time).
//
if ((rc = myAioReq->Write(aiop)))
{myIOLen = myIOLen-myBlast;
return aio_Error("write", rc);
}
myOffset += myBlast; myIOLen -= myBlast;
// Either continue the request or return to enable the link
//
if (myIOLen > 0) return aio_WriteAll();
myAioReq = 0;
return 0;
}