/******************************************************************************/
/* */
/* X r d S y s I O E v e n t s . c c */
/* */
/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
/* All Rights Reserved */
/* Produced by Andrew Hanushevsky for Stanford University under contract */
/* DE-AC02-76-SFO0515 with the Department of Energy */
/* */
/* This file is part of the XRootD software suite. */
/* */
/* XRootD is free software: you can redistribute it and/or modify it under */
/* the terms of the GNU Lesser General Public License as published by the */
/* Free Software Foundation, either version 3 of the License, or (at your */
/* option) any later version. */
/* */
/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
/* License for more details. */
/* */
/* You should have received a copy of the GNU Lesser General Public License */
/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
/* COPYING (GPL license). If not, see . */
/* */
/* The copyright holder's institutional names and contributor's names may not */
/* be used to endorse or promote products derived from this software without */
/* specific prior written permission of the institution or contributor. */
/******************************************************************************/
#include
#include
#include
#include "XrdSys/XrdSysFD.hh"
#include "XrdSys/XrdSysIOEvents.hh"
#include "XrdSys/XrdSysHeaders.hh"
#include "XrdSys/XrdSysPlatform.hh"
#include "XrdSys/XrdSysPthread.hh"
/******************************************************************************/
/* L o c a l D e f i n e s */
/******************************************************************************/
#define SINGLETON(dlvar, theitem)\
theitem ->dlvar .next == theitem
#define INSERT(dlvar, curitem, newitem) \
newitem ->dlvar .next = curitem; \
newitem ->dlvar .prev = curitem ->dlvar .prev; \
curitem ->dlvar .prev-> dlvar .next = newitem; \
curitem ->dlvar .prev = newitem
#define REMOVE(dlbase, dlvar, curitem) \
if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
? 0 : curitem ->dlvar .next);\
curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
curitem ->dlvar .next = curitem;\
curitem ->dlvar .prev = curitem
#define REVENTS(x) x & Channel:: readEvents
#define WEVENTS(x) x & Channel::writeEvents
#define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
#define BOOLNAME(x) (x ? "true" : "false")
#define DO_TRACE(x,fd,y) \
{PollerInit::traceMTX.Lock(); \
cerr <<"IOE fd " <pollP;
XrdSysSemaphore *theSem = pollArg->pollSync;
thePoller->pollTid = XrdSysThread::ID();
thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg));
delete theSem;
return (void *)0;
}
/******************************************************************************/
/* P o l l e r E r r 1 */
/******************************************************************************/
// This class is set in the channel when an error occurs but cannot be reflected
// immediately because either there is no callback function or all events are
// disabled. We need to do this because error events can be physically presented
// by the kernel even when logical events are disabled. Note that the error
// number and text will have been set and remain set as the channel was actually
// disabled preventing any new operation on the channel.
//
class PollerErr1 : public Poller
{
public:
PollerErr1() : Poller(-1, -1) {}
~PollerErr1() {}
protected:
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{if (!(eNum = GetFault(cP))) eNum = EPROTO;
if (eTxt) *eTxt = "initializing channel";
return false;
}
bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{if (!(eNum = GetFault(cP))) eNum = EPROTO;
if (eTxt) *eTxt = "modifying channel";
return false;
}
void Shutdown() {}
};
/******************************************************************************/
/* P o l l e r I n i t */
/******************************************************************************/
// This class is used as the initial poller on a channel. It is responsible
// for adding the file descriptor to the poll set upon the initial enable. This
// avoids enabling a channel prior to it receiving a call back function.
//
class PollerInit : public Poller
{
public:
PollerInit() : Poller(-1, -1) {}
~PollerInit() {}
static XrdSysMutex traceMTX;
static bool doTrace;
protected:
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{eNum = EPROTO;
if (eTxt) *eTxt = "initializing channel";
return false;
}
bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
{bool rc = Init(cP, eNum, eTxt, isLocked);
IF_TRACE(Modify,cP->GetFD(), "Init() returned " <Attach(this);
}
/******************************************************************************/
/* D e l e t e */
/******************************************************************************/
void XrdSys::IOEvents::Channel::Delete()
{
Poller *myPoller;
bool isLocked = true;
// Lock ourselves during the delete process. If the channel is disassociated
// or the real poller is set to the error poller then this channel is clean
// and can be deleted (i.e. the channel ran through Detach()).
//
chMutex.Lock();
if (!chPollXQ || chPollXQ == &pollErr1)
{chMutex.UnLock();
delete this;
return;
}
// Disable and remove ourselves from all queues
//
myPoller = chPollXQ;
chPollXQ->Detach(this,isLocked,false);
if (!isLocked) chMutex.Lock();
// If we are in callback mode then we will need to delay the destruction until
// after the callback completes unless this is the poller thread. In that case,
// we need to tell the poller that we have been destroyed in a shelf-stable way.
//
if (chStat)
{if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid))
{myPoller->chDead = true;
chMutex.UnLock();
} else {
XrdSysSemaphore cbDone(0);
chStat = isDead;
chCBA = (void *)&cbDone;
chMutex.UnLock();
cbDone.Wait();
}
}
// It is now safe to release the storage
//
delete this;
}
/******************************************************************************/
/* D i s a b l e */
/******************************************************************************/
bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
{
int eNum = 0, newev, curev;
bool retval = true, isLocked = true;
// Lock this channel
//
chMutex.Lock();
// Get correct current events; depending on the state of the channel
//
if (chPoller == &pollWait) curev = static_cast(reMod);
else curev = static_cast(chEvents);
// Trace this entry
//
IF_TRACE(Disable,chFD,"->Disable(" <Enable("<chPoller == &pollInit ? "init" :
(cP->chPoller == &pollWait ? "wait" : "err")));
DO_TRACE(CbkXeq,cP->chFD,"callback events=" <chCB ? "present" : "missing")
<<" poller=" <inTOQ)
{TmoDel(cP);
cP->dlType |= (events & CallBack::ValidEvents) << 4;
isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
if (isRead) cP->rdDL = maxTime;
isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut);
if (isWrite) cP->wrDL = maxTime;
} else {
cP->dlType &= CallBack::ValidEvents;
isRead = isWrite = false;
}
// Verify that there is a callback here and the channel is ready. If not,
// disable this channel for the events being refelcted unless the event is a
// fatal error. In this case we need to abandon the channel since error events
// may continue to be generated as we can't always disable them.
//
if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
{if (eNum)
{cP->chPoller = &pollErr1; cP->chFault = eNum;
cP->inPSet = 0;
return false;
}
oldEvents = cP->chEvents;
cP->chEvents = 0;
retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
TRACE_MOD(CbkXeq,cP->chFD,0);
if (!isLocked) cP->chMutex.Lock();
cP->chEvents = oldEvents;
return true;
}
// Resolve the problem where we get an error event but the channel wants them
// presented as a read or write event. If neither is possible then defer the
// error until the channel is enabled again.
//
if (eNum)
{if (cP->chEvents & Channel::errorEvents)
{cP->chPoller = &pollErr1; cP->chFault = eNum;
cP->chStat = Channel::isCBMode;
chDead = false;
cbkMHelp.UnLock();
cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
if (chDead) return true;
cbkMHelp.Lock(&(cP->chMutex));
cP->inPSet = 0;
return false;
}
if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
return false;
}
}
// Indicate that we are in callback mode then drop the channel lock and effect
// the callback. This allows the callback to freely manage locks.
//
cP->chStat = Channel::isCBMode;
chDead = false;
cbkMHelp.UnLock();
IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <chCB->Event(cP,cP->chCBA, events);
IF_TRACE(CbkXeq,cP->chFD,"callback returned " <chMutex));
// If the channel is being destroyed; then another thread must have done so.
// Tell it the callback has finished and just return.
//
if (cP->chStat != Channel::isCBMode)
{if (cP->chStat == Channel::isDead)
((XrdSysSemaphore *)cP->chCBA)->Post();
return true;
}
cP->chStat = Channel::isClear;
// Handle enable or disable here. If we keep the channel enabled then reset
// the timeout if it hasn't been handled via a call from the callback.
//
if (!cbok) Detach(cP,isLocked,false);
else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
TmoAdd(cP, 0);
// All done. While the mutex should not have been unlocked, we relock it if
// it has to keep the mutex helper from croaking.
//
if (!isLocked) cP->chMutex.Lock();
return true;
}
/******************************************************************************/
/* Static: C r e a t e */
/******************************************************************************/
XrdSys::IOEvents::Poller *XrdSys::IOEvents::Poller::Create(int &eNum,
const char **eTxt,
int crOpts)
{
int fildes[2];
struct pollArg pArg;
pthread_t tid;
// Create a pipe used to break the poll wait loop
//
if (XrdSysFD_Pipe(fildes))
{eNum = errno;
if (eTxt) *eTxt = "creating poll pipe";
return 0;
}
// Create an actual implementation of a poller
//
if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
{close(fildes[0]);
close(fildes[1]);
return 0;
}
// Now start a thread to handle this poller object
//
if ((eNum = XrdSysThread::Run(&tid, XrdSys::IOEvents::BootStrap::Start,
(void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
{if (eTxt) *eTxt = "creating poller thread"; return 0;}
// Now wait for the thread to finish initializing before we allow use
// Note that the bootstrap takes ownership of the semaphore and will delete it
// once the thread positing the semaphore actually ends. This is to avoid
// semaphore bugs present in certain (e.g. Linux) kernels.
//
pArg.pollSync->Wait();
// Check if all went well
//
if (pArg.retCode)
{if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
eNum = pArg.retCode;
delete pArg.pollP;
return 0;
}
// Set creation options in the new poller
//
if (crOpts & optTOM)
pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
// All done
//
eNum = 0;
if (eTxt) *eTxt = "";
return pArg.pollP;
}
/******************************************************************************/
/* D e t a c h */
/******************************************************************************/
void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP,
bool &isLocked, bool keep)
{
// The caller must hold the channel lock!
//
bool detFD = (cP->inPSet != 0);
// First remove the channel from the timeout queue
//
if (cP->inTOQ)
{toMutex.Lock();
REMOVE(tmoBase, tmoList, cP);
toMutex.UnLock();
}
// Allow only one detach at a time
//
adMutex.Lock();
// Preset channel to prohibit callback if we are not keeping this channel
//
if (!keep)
{cP->Reset(&pollErr1, cP->chFD);
cP->chPollXQ = &pollErr1;
cP->chCB = 0;
cP->chCBA = 0;
if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);}
else if (attBase == cP) attBase = 0;
}
// Exclude this channel from the associated poll set, don't hold the ad lock
//
adMutex.UnLock();
if (detFD)
{cP->inPSet = 0;
if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER);
}
}
/******************************************************************************/
/* Protected: G e t R e q u e s t */
/******************************************************************************/
// Warning: This method runs unlocked. The caller must have exclusive use of
// the reqBuff otherwise unpredictable results will occur.
int XrdSys::IOEvents::Poller::GetRequest()
{
ssize_t rlen;
int rc;
// See if we are to resume a read or start a fresh one
//
if (!pipeBlen)
{pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
// Wait for the next request. Some OS's (like Linux) don't support non-blocking
// pipes. So, we must front the read with a poll.
//
do {rc = poll(&pipePoll, 1, 0);}
while(rc < 0 && (errno == EAGAIN || errno == EINTR));
if (rc < 1) return 0;
// Now we can put up a read without a delay. Normally a full command will be
// present. Under some heavy conditions, this may not be the case.
//
do {rlen = read(reqFD, pipeBuff, pipeBlen);}
while(rlen < 0 && errno == EINTR);
if (rlen <= 0)
{cerr <<"Poll: " <chPoller == &pollWait)
{cP->reMod = cP->chEvents;
cP->chEvents = 0;
IF_TRACE(Init,cP->chFD,"defer events=" <reMod);
return true;
}
// Trace this entry
//
IF_TRACE(Init,cP->chFD,"begin events=" <chEvents));
// If no events are enabled at this point, just return
//
if (!(cP->chEvents)) return true;
// Refuse to enable a channel without a callback function
//
if (!(cP->chCB))
{eNum = EDESTADDRREQ;
if (eTxt) *eTxt = "enabling without a callback";
return false;
}
// So, now we can include the channel in the poll set. We will include it
// with no events enabled to prevent callbacks prior to completion here.
//
cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
IF_TRACE(Init,cP->chFD,"Include() returned " <chMutex.Lock(); isLocked = true;}
// Determine what future poller to use. If we can use the regular poller then
// set the correct event mask for the channel. Note that we could have lost
// control but the correct events will be reflected in the "reMod" member.
//
if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
else {cP->chPoller = cP->chPollXQ;
cP->inPSet = 1;
if (cP->reMod)
{cP->chEvents = cP->reMod;
retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
TRACE_MOD(Init,cP->chFD,int(cP->reMod));
if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
} else {
TRACE_NOD(Init,cP->chFD,0);
}
}
// All done
//
cP->reMod = 0;
return retval;
}
/******************************************************************************/
/* P o l l 2 E n u m */
/******************************************************************************/
int XrdSys::IOEvents::Poller::Poll2Enum(short events)
{
if (events & POLLERR) return EPIPE;
if (events & POLLHUP) return ECONNRESET;
if (events & POLLNVAL) return EBADF;
return EOPNOTSUPP;
}
/******************************************************************************/
/* S e n d C m d */
/******************************************************************************/
int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd)
{
int wlen;
// Pipe writes are atomic so we don't need locks. Some commands require
// confirmation. We handle that here based on the command. Note that pipes
// gaurantee that all of the data will be written or we will block.
//
if (cmd.req >= PipeData::Post)
{XrdSysSemaphore mySem(0);
cmd.theSem = &mySem;
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
if (wlen > 0) mySem.Wait();
} else {
do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
while (wlen < 0 && errno == EINTR);
}
// All done
//
return (wlen >= 0 ? 0 : errno);
}
/******************************************************************************/
/* Protected: S e t P o l l E n t */
/******************************************************************************/
void XrdSys::IOEvents::Poller::SetPollEnt(XrdSys::IOEvents::Channel *cP, int pe)
{
cP->pollEnt = pe;
}
/******************************************************************************/
/* S t o p */
/******************************************************************************/
void XrdSys::IOEvents::Poller::Stop()
{
PipeData cmdbuff;
CallBack *theCB;
Channel *cP;
void *cbArg;
int doCB;
// Initialize the pipdata structure
//
memset(static_cast( &cmdbuff ), 0, sizeof(cmdbuff));
cmdbuff.req = PipeData::Stop;
// Lock all of this
//
adMutex.Lock();
// If we are already shutdown then we are done
//
if (cmdFD == -1) {adMutex.UnLock(); return;}
// First we must stop the poller thread in an orderly fashion.
//
SendCmd(cmdbuff);
// Close the pipe communication mechanism
//
close(cmdFD); cmdFD = -1;
close(reqFD); reqFD = -1;
// Run through cleaning up the channels. While there should not be any other
// operations happening on this poller, we take the conservative approach.
//
while((cP = attBase))
{REMOVE(attBase, attList, cP);
adMutex.UnLock();
cP->chMutex.Lock();
doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
if (cP->inTOQ) TmoDel(cP);
cP->Reset(&pollErr1, cP->chFD, EIDRM);
cP->chPollXQ = &pollErr1;
if (doCB)
{cP->chStat = Channel::isClear;
theCB = cP->chCB; cbArg = cP->chCBA;
cP->chMutex.UnLock();
theCB->Stop(cP, cbArg);
} else cP->chMutex.UnLock();
adMutex.Lock();
}
// Now invoke the poller specific shutdown
//
Shutdown();
adMutex.UnLock();
}
/******************************************************************************/
/* T m o A d d */
/******************************************************************************/
bool XrdSys::IOEvents::Poller::TmoAdd(XrdSys::IOEvents::Channel *cP, int tmoSet)
{
XrdSysMutexHelper mHelper(toMutex);
time_t tNow;
Channel *ncP;
bool setRTO, setWTO;
// Remove element from timeout queue if it is there
//
if (cP->inTOQ)
{REMOVE(tmoBase, tmoList, cP);
cP->inTOQ = 0;
}
// Determine which timeouts need to be reset
//
tmoSet|= cP->dlType >> 4;
setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
setWTO = (tmoSet&tmoMask) & (CallBack::ReadyToWrite|CallBack::WriteTimeOut);
// Reset the required deadlines
//
tNow = time(0);
if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
cP->rdDL = cP->chRTO + tNow;
if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
cP->wrDL = cP->chWTO + tNow;
// Calculate the closest enabled deadline
//
if (cP->rdDL < cP->wrDL)
{cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
} else {
cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
}
IF_TRACE(TmoAdd, cP->chFD, "t=" <