/******************************************************************************/ /* */ /* X r d S y s I O E v e n t s . c c */ /* */ /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include "XrdSys/XrdSysFD.hh" #include "XrdSys/XrdSysIOEvents.hh" #include "XrdSys/XrdSysHeaders.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XrdSys/XrdSysPthread.hh" /******************************************************************************/ /* L o c a l D e f i n e s */ /******************************************************************************/ #define SINGLETON(dlvar, theitem)\ theitem ->dlvar .next == theitem #define INSERT(dlvar, curitem, newitem) \ newitem ->dlvar .next = curitem; \ newitem ->dlvar .prev = curitem ->dlvar .prev; \ curitem ->dlvar .prev-> dlvar .next = newitem; \ curitem ->dlvar .prev = newitem #define REMOVE(dlbase, dlvar, curitem) \ if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \ ? 0 : curitem ->dlvar .next);\ curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\ curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\ curitem ->dlvar .next = curitem;\ curitem ->dlvar .prev = curitem #define REVENTS(x) x & Channel:: readEvents #define WEVENTS(x) x & Channel::writeEvents #define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid) #define BOOLNAME(x) (x ? "true" : "false") #define DO_TRACE(x,fd,y) \ {PollerInit::traceMTX.Lock(); \ cerr <<"IOE fd " <pollP; XrdSysSemaphore *theSem = pollArg->pollSync; thePoller->pollTid = XrdSysThread::ID(); thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg)); delete theSem; return (void *)0; } /******************************************************************************/ /* P o l l e r E r r 1 */ /******************************************************************************/ // This class is set in the channel when an error occurs but cannot be reflected // immediately because either there is no callback function or all events are // disabled. We need to do this because error events can be physically presented // by the kernel even when logical events are disabled. Note that the error // number and text will have been set and remain set as the channel was actually // disabled preventing any new operation on the channel. // class PollerErr1 : public Poller { public: PollerErr1() : Poller(-1, -1) {} ~PollerErr1() {} protected: void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {} void Exclude(Channel *cP, bool &isLocked, bool dover=1) {} bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked) {if (!(eNum = GetFault(cP))) eNum = EPROTO; if (eTxt) *eTxt = "initializing channel"; return false; } bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked) {if (!(eNum = GetFault(cP))) eNum = EPROTO; if (eTxt) *eTxt = "modifying channel"; return false; } void Shutdown() {} }; /******************************************************************************/ /* P o l l e r I n i t */ /******************************************************************************/ // This class is used as the initial poller on a channel. It is responsible // for adding the file descriptor to the poll set upon the initial enable. This // avoids enabling a channel prior to it receiving a call back function. // class PollerInit : public Poller { public: PollerInit() : Poller(-1, -1) {} ~PollerInit() {} static XrdSysMutex traceMTX; static bool doTrace; protected: void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {} void Exclude(Channel *cP, bool &isLocked, bool dover=1) {} bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked) {eNum = EPROTO; if (eTxt) *eTxt = "initializing channel"; return false; } bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked) {bool rc = Init(cP, eNum, eTxt, isLocked); IF_TRACE(Modify,cP->GetFD(), "Init() returned " <Attach(this); } /******************************************************************************/ /* D e l e t e */ /******************************************************************************/ void XrdSys::IOEvents::Channel::Delete() { Poller *myPoller; bool isLocked = true; // Lock ourselves during the delete process. If the channel is disassociated // or the real poller is set to the error poller then this channel is clean // and can be deleted (i.e. the channel ran through Detach()). // chMutex.Lock(); if (!chPollXQ || chPollXQ == &pollErr1) {chMutex.UnLock(); delete this; return; } // Disable and remove ourselves from all queues // myPoller = chPollXQ; chPollXQ->Detach(this,isLocked,false); if (!isLocked) chMutex.Lock(); // If we are in callback mode then we will need to delay the destruction until // after the callback completes unless this is the poller thread. In that case, // we need to tell the poller that we have been destroyed in a shelf-stable way. // if (chStat) {if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid)) {myPoller->chDead = true; chMutex.UnLock(); } else { XrdSysSemaphore cbDone(0); chStat = isDead; chCBA = (void *)&cbDone; chMutex.UnLock(); cbDone.Wait(); } } // It is now safe to release the storage // delete this; } /******************************************************************************/ /* D i s a b l e */ /******************************************************************************/ bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText) { int eNum = 0, newev, curev; bool retval = true, isLocked = true; // Lock this channel // chMutex.Lock(); // Get correct current events; depending on the state of the channel // if (chPoller == &pollWait) curev = static_cast(reMod); else curev = static_cast(chEvents); // Trace this entry // IF_TRACE(Disable,chFD,"->Disable(" <Enable("<chPoller == &pollInit ? "init" : (cP->chPoller == &pollWait ? "wait" : "err"))); DO_TRACE(CbkXeq,cP->chFD,"callback events=" <chCB ? "present" : "missing") <<" poller=" <inTOQ) {TmoDel(cP); cP->dlType |= (events & CallBack::ValidEvents) << 4; isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut); if (isRead) cP->rdDL = maxTime; isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut); if (isWrite) cP->wrDL = maxTime; } else { cP->dlType &= CallBack::ValidEvents; isRead = isWrite = false; } // Verify that there is a callback here and the channel is ready. If not, // disable this channel for the events being refelcted unless the event is a // fatal error. In this case we need to abandon the channel since error events // may continue to be generated as we can't always disable them. // if (!(cP->chCB) || cP->chPoller != cP->chPollXQ) {if (eNum) {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0; return false; } oldEvents = cP->chEvents; cP->chEvents = 0; retval = cP->chPoller->Modify(cP, eNum, 0, isLocked); TRACE_MOD(CbkXeq,cP->chFD,0); if (!isLocked) cP->chMutex.Lock(); cP->chEvents = oldEvents; return true; } // Resolve the problem where we get an error event but the channel wants them // presented as a read or write event. If neither is possible then defer the // error until the channel is enabled again. // if (eNum) {if (cP->chEvents & Channel::errorEvents) {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->chStat = Channel::isCBMode; chDead = false; cbkMHelp.UnLock(); cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt); if (chDead) return true; cbkMHelp.Lock(&(cP->chMutex)); cP->inPSet = 0; return false; } if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead; else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite; else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0; return false; } } // Indicate that we are in callback mode then drop the channel lock and effect // the callback. This allows the callback to freely manage locks. // cP->chStat = Channel::isCBMode; chDead = false; cbkMHelp.UnLock(); IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <chCB->Event(cP,cP->chCBA, events); IF_TRACE(CbkXeq,cP->chFD,"callback returned " <chMutex)); // If the channel is being destroyed; then another thread must have done so. // Tell it the callback has finished and just return. // if (cP->chStat != Channel::isCBMode) {if (cP->chStat == Channel::isDead) ((XrdSysSemaphore *)cP->chCBA)->Post(); return true; } cP->chStat = Channel::isClear; // Handle enable or disable here. If we keep the channel enabled then reset // the timeout if it hasn't been handled via a call from the callback. // if (!cbok) Detach(cP,isLocked,false); else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO)) TmoAdd(cP, 0); // All done. While the mutex should not have been unlocked, we relock it if // it has to keep the mutex helper from croaking. // if (!isLocked) cP->chMutex.Lock(); return true; } /******************************************************************************/ /* Static: C r e a t e */ /******************************************************************************/ XrdSys::IOEvents::Poller *XrdSys::IOEvents::Poller::Create(int &eNum, const char **eTxt, int crOpts) { int fildes[2]; struct pollArg pArg; pthread_t tid; // Create a pipe used to break the poll wait loop // if (XrdSysFD_Pipe(fildes)) {eNum = errno; if (eTxt) *eTxt = "creating poll pipe"; return 0; } // Create an actual implementation of a poller // if (!(pArg.pollP = newPoller(fildes, eNum, eTxt))) {close(fildes[0]); close(fildes[1]); return 0; } // Now start a thread to handle this poller object // if ((eNum = XrdSysThread::Run(&tid, XrdSys::IOEvents::BootStrap::Start, (void *)&pArg, XRDSYSTHREAD_BIND, "Poller"))) {if (eTxt) *eTxt = "creating poller thread"; return 0;} // Now wait for the thread to finish initializing before we allow use // Note that the bootstrap takes ownership of the semaphore and will delete it // once the thread positing the semaphore actually ends. This is to avoid // semaphore bugs present in certain (e.g. Linux) kernels. // pArg.pollSync->Wait(); // Check if all went well // if (pArg.retCode) {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller"); eNum = pArg.retCode; delete pArg.pollP; return 0; } // Set creation options in the new poller // if (crOpts & optTOM) pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut); // All done // eNum = 0; if (eTxt) *eTxt = ""; return pArg.pollP; } /******************************************************************************/ /* D e t a c h */ /******************************************************************************/ void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP, bool &isLocked, bool keep) { // The caller must hold the channel lock! // bool detFD = (cP->inPSet != 0); // First remove the channel from the timeout queue // if (cP->inTOQ) {toMutex.Lock(); REMOVE(tmoBase, tmoList, cP); toMutex.UnLock(); } // Allow only one detach at a time // adMutex.Lock(); // Preset channel to prohibit callback if we are not keeping this channel // if (!keep) {cP->Reset(&pollErr1, cP->chFD); cP->chPollXQ = &pollErr1; cP->chCB = 0; cP->chCBA = 0; if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);} else if (attBase == cP) attBase = 0; } // Exclude this channel from the associated poll set, don't hold the ad lock // adMutex.UnLock(); if (detFD) {cP->inPSet = 0; if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER); } } /******************************************************************************/ /* Protected: G e t R e q u e s t */ /******************************************************************************/ // Warning: This method runs unlocked. The caller must have exclusive use of // the reqBuff otherwise unpredictable results will occur. int XrdSys::IOEvents::Poller::GetRequest() { ssize_t rlen; int rc; // See if we are to resume a read or start a fresh one // if (!pipeBlen) {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);} // Wait for the next request. Some OS's (like Linux) don't support non-blocking // pipes. So, we must front the read with a poll. // do {rc = poll(&pipePoll, 1, 0);} while(rc < 0 && (errno == EAGAIN || errno == EINTR)); if (rc < 1) return 0; // Now we can put up a read without a delay. Normally a full command will be // present. Under some heavy conditions, this may not be the case. // do {rlen = read(reqFD, pipeBuff, pipeBlen);} while(rlen < 0 && errno == EINTR); if (rlen <= 0) {cerr <<"Poll: " <chPoller == &pollWait) {cP->reMod = cP->chEvents; cP->chEvents = 0; IF_TRACE(Init,cP->chFD,"defer events=" <reMod); return true; } // Trace this entry // IF_TRACE(Init,cP->chFD,"begin events=" <chEvents)); // If no events are enabled at this point, just return // if (!(cP->chEvents)) return true; // Refuse to enable a channel without a callback function // if (!(cP->chCB)) {eNum = EDESTADDRREQ; if (eTxt) *eTxt = "enabling without a callback"; return false; } // So, now we can include the channel in the poll set. We will include it // with no events enabled to prevent callbacks prior to completion here. // cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0; retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked); IF_TRACE(Init,cP->chFD,"Include() returned " <chMutex.Lock(); isLocked = true;} // Determine what future poller to use. If we can use the regular poller then // set the correct event mask for the channel. Note that we could have lost // control but the correct events will be reflected in the "reMod" member. // if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;} else {cP->chPoller = cP->chPollXQ; cP->inPSet = 1; if (cP->reMod) {cP->chEvents = cP->reMod; retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked); TRACE_MOD(Init,cP->chFD,int(cP->reMod)); if (!isLocked) {cP->chMutex.Lock(); isLocked = true;} } else { TRACE_NOD(Init,cP->chFD,0); } } // All done // cP->reMod = 0; return retval; } /******************************************************************************/ /* P o l l 2 E n u m */ /******************************************************************************/ int XrdSys::IOEvents::Poller::Poll2Enum(short events) { if (events & POLLERR) return EPIPE; if (events & POLLHUP) return ECONNRESET; if (events & POLLNVAL) return EBADF; return EOPNOTSUPP; } /******************************************************************************/ /* S e n d C m d */ /******************************************************************************/ int XrdSys::IOEvents::Poller::SendCmd(PipeData &cmd) { int wlen; // Pipe writes are atomic so we don't need locks. Some commands require // confirmation. We handle that here based on the command. Note that pipes // gaurantee that all of the data will be written or we will block. // if (cmd.req >= PipeData::Post) {XrdSysSemaphore mySem(0); cmd.theSem = &mySem; do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));} while (wlen < 0 && errno == EINTR); if (wlen > 0) mySem.Wait(); } else { do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));} while (wlen < 0 && errno == EINTR); } // All done // return (wlen >= 0 ? 0 : errno); } /******************************************************************************/ /* Protected: S e t P o l l E n t */ /******************************************************************************/ void XrdSys::IOEvents::Poller::SetPollEnt(XrdSys::IOEvents::Channel *cP, int pe) { cP->pollEnt = pe; } /******************************************************************************/ /* S t o p */ /******************************************************************************/ void XrdSys::IOEvents::Poller::Stop() { PipeData cmdbuff; CallBack *theCB; Channel *cP; void *cbArg; int doCB; // Initialize the pipdata structure // memset(static_cast( &cmdbuff ), 0, sizeof(cmdbuff)); cmdbuff.req = PipeData::Stop; // Lock all of this // adMutex.Lock(); // If we are already shutdown then we are done // if (cmdFD == -1) {adMutex.UnLock(); return;} // First we must stop the poller thread in an orderly fashion. // SendCmd(cmdbuff); // Close the pipe communication mechanism // close(cmdFD); cmdFD = -1; close(reqFD); reqFD = -1; // Run through cleaning up the channels. While there should not be any other // operations happening on this poller, we take the conservative approach. // while((cP = attBase)) {REMOVE(attBase, attList, cP); adMutex.UnLock(); cP->chMutex.Lock(); doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent); if (cP->inTOQ) TmoDel(cP); cP->Reset(&pollErr1, cP->chFD, EIDRM); cP->chPollXQ = &pollErr1; if (doCB) {cP->chStat = Channel::isClear; theCB = cP->chCB; cbArg = cP->chCBA; cP->chMutex.UnLock(); theCB->Stop(cP, cbArg); } else cP->chMutex.UnLock(); adMutex.Lock(); } // Now invoke the poller specific shutdown // Shutdown(); adMutex.UnLock(); } /******************************************************************************/ /* T m o A d d */ /******************************************************************************/ bool XrdSys::IOEvents::Poller::TmoAdd(XrdSys::IOEvents::Channel *cP, int tmoSet) { XrdSysMutexHelper mHelper(toMutex); time_t tNow; Channel *ncP; bool setRTO, setWTO; // Remove element from timeout queue if it is there // if (cP->inTOQ) {REMOVE(tmoBase, tmoList, cP); cP->inTOQ = 0; } // Determine which timeouts need to be reset // tmoSet|= cP->dlType >> 4; setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut); setWTO = (tmoSet&tmoMask) & (CallBack::ReadyToWrite|CallBack::WriteTimeOut); // Reset the required deadlines // tNow = time(0); if (setRTO && REVENTS(cP->chEvents) && cP->chRTO) cP->rdDL = cP->chRTO + tNow; if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO) cP->wrDL = cP->chWTO + tNow; // Calculate the closest enabled deadline // if (cP->rdDL < cP->wrDL) {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut; } else { cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut; if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut; } IF_TRACE(TmoAdd, cP->chFD, "t=" <