#ifndef __XRDSYSIOEVENTS_HH__ #define __XRDSYSIOEVENTS_HH__ /******************************************************************************/ /* */ /* X r d S y s I O E v e n t s . h h */ /* */ /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include "XrdSys/XrdSysPthread.hh" #include "XrdSys/XrdSysAtomics.hh" //----------------------------------------------------------------------------- //! IOEvents //! //! The classes here define a simple I/O event polling architecture suitable //! for use with non-blocking devices. As it implements an event model, it is //! not considered a high performance interface. For increased performance, you //! need to use multiple polling event loops which effectively implements a //! limited thread model for handling events. The implementation here is similar //! to libEvent with better handling of timeouts and I/O polling resumption. //! //! While, channels are multi-thread safe, they cannot interlock with the state //! of their file descriptor. You must first disable (via SetFD()) or delete //! the channel before closing its associated file descriptor. This is the //! only safe way to keep channels and their file descriptors synchronized. //----------------------------------------------------------------------------- namespace XrdSys { namespace IOEvents { /******************************************************************************/ /* C l a s s C a l l B a c k */ /******************************************************************************/ //----------------------------------------------------------------------------- //! Interface //! //! The object used to effect a callback when an event occurs on a channel. //! During the callback, all channels associated with the poller object doing //! the callback are suspended until the callback object returns. All queued //! callbacks are handled before the poller resumes any channels. This provides //! simple serialization for all channels associated with a single poller. //! You may call any channel method from a callback to effect appropriate //! changes. You may also delete the channel at any time. //----------------------------------------------------------------------------- class Channel; class CallBack { public: //----------------------------------------------------------------------------- //! Events that may cause a callback object to be activated. //----------------------------------------------------------------------------- enum EventType { ReadyToRead = 0x01, //!< New data has arrived ReadTimeOut = 0x02, //!< Read timeout ReadyToWrite = 0x04, //!< Writing won't block WriteTimeOut = 0x08, //!< Write timeout ValidEvents = 0x0f //!< Mask to test for valid events }; //----------------------------------------------------------------------------- //! Handle event notification. A method must be supplied. The enable/disable //! status of the channel is not modified. To change the status, use the //! channel's Enable() and Disable() method prior to returning. After return, //! the current channel's status is used to determine how it will behave. If //! the event is a timeout, the timeout becomes infinite for that event unless //! Enable() is called for the event. This is to prevent timeout loops on //! channels that remain enabled even after a timeout. Event loop callbacks //! define a hazardous programming model. If you do not have a well defined //! threading model, you should restrict yourself to dealing only with the //! passed channel object in the callback so as to avoid deadlocks. //! //! @param chP the associated channel causing the callback. //! @param cbArg the callback argument specified for the channel. //! @param evFlags events that caused this callback to be invoked. More than //! one event may be indicated (see EventType above). //! //! @return true Resume handling the channel with current status. //! false Disable the channel and remove it from associated poller. //----------------------------------------------------------------------------- virtual bool Event(Channel *chP, void *cbArg, int evFlags) = 0; //----------------------------------------------------------------------------- //! Handle fatal error notification. This method is called only when error //! events are specifically enabled (see Enable() for admonitions). It is //! passed the reason for the error. Upon return, the channel is disabled but //! stays attached to the poller so that it can be revitalized with SetFD(). //! You should replace this method if you specifically enable error events. //! //! @param chP the associated channel causing the callback. //! @param cbArg the callback argument specified for the channel. //! @param eNum the errno associated with the error. //! @param eTxt descriptive name of the operation encountering the error. //----------------------------------------------------------------------------- virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt) { (void)chP; (void)cbArg; (void)eNum; (void)eTxt; }; //----------------------------------------------------------------------------- //! Handle poller stop notification. This method is called only when the poller //! is stopped and the channel enabled the stop event. You should should replace //! this method if you specifically enable the stop event. You must not invoke //! channel methods in this callback, otherwise the results are unpredictable. //! //! @param chP the associated channel causing the callback. //! @param cbArg the callback argument specified for the channel. //----------------------------------------------------------------------------- virtual void Stop(Channel *chP, void *cbArg) { (void)chP; (void)cbArg;} //----------------------------------------------------------------------------- //! Constructor //----------------------------------------------------------------------------- CallBack() {} //----------------------------------------------------------------------------- //! Destructor //----------------------------------------------------------------------------- virtual ~CallBack() {} }; /******************************************************************************/ /* C l a s s C h a n n e l */ /******************************************************************************/ //----------------------------------------------------------------------------- //! Describe a channel that is capable of fielding events. A valid channel is //! associated with a Poller object, a CallBack object, and an open file //! descriptor. These are normally set at construction time. //----------------------------------------------------------------------------- class ChannelWait; class Poller; class Channel { friend class Poller; public: //----------------------------------------------------------------------------- //! Delete a channel. You must use this method instead of delete. The Delete() //! may block if an channel is being deleted outside of the poller thread. //! When this object is deleted, all events are disabled, pending callbacks //! are either completed or canceled, and the channel is removed from the //! assigned poller. Only then is the storage freed. //----------------------------------------------------------------------------- void Delete(); //----------------------------------------------------------------------------- //! Event bits used to feed Enable() and Disable(); can be or'd. //----------------------------------------------------------------------------- enum EventCode {readEvents = 0x01, //!< Read and Read Timeouts writeEvents = 0x04, //!< Write and Write Timeouts rwEvents = 0x05, //!< Both of the above errorEvents = 0x10, //!< Error event non-r/w specific stopEvent = 0x20, //!< Poller stop event allEvents = 0x35 //!< All of the above }; //----------------------------------------------------------------------------- //! Disable one or more events. Ignored for already disabled events. //! //! @param events one or more events or'd together (see EventCode above). //! @param eText optional pointer to where an operation description is to be //! placed when an error occurs (i.e. returns false). //! //! @return true Events successfully disabled. //! false Events not disabled; errno holds the error number and if //! eText is supplied, points to the operation desscription. //----------------------------------------------------------------------------- bool Disable(int events, const char **eText=0); //----------------------------------------------------------------------------- //! Enable one or more events. Events that are already enabled remain enabled //! but may have their timeout value change. //! //! Enable can fail for many reasons. Most importantly, if the channel was //! disabled for all events when a fatal error occurred; enabling it immediately //! returns the fatal error without invoking the callback. This happens on //! platforms that disallow physically masking out error events. //! //! Additionally, when an error occurs and the channel is not enabled for error //! events but is enabled for read or write, the callback is called indicating //! ReadyToRead or ReadyToWrite. A subsequent write will then end with an error //! (typically, EPIPE) and a subsequent read will end with an erorr or indicate //! zero bytes read; either of which should be treated as an error (typically, //! POLLHUP). Generally, you should always allow separable error events. //! //! @param events one or more events or'd together (see EventCode above). //! @param timeout >0 maximum seconds that may elapsed before a timeout event //! corresponding to the specified event(s) occurs. //! =0 Keep whatever timeout is currently in effect from the //! previous Enable() invocation for the event(s). //! <0 No timeout applies. //! There can be separate timeouts for read and write if //! Enable() is separately called for each event code. //! Otherwise, the timeout applies to all specified events. //! The timeout is ignored for error events. //! @param eText optional pointer to where an operation description is to be //! placed when an error occurs (i.e. returns false). //! //! @return true Events successfully enabled. //! false Events not enabled; errno holds the error number and if //! eText is supplied, points to the operation desscription. //----------------------------------------------------------------------------- bool Enable(int events, int timeout=0, const char **eText=0); //----------------------------------------------------------------------------- //! Get the callback object and argument associated with this channel. //! //! @param cbP Place where the pointer is to be returned. //! @param cbArg Place where the callback argument is to be returned. //----------------------------------------------------------------------------- void GetCallBack(CallBack **cbP, void **cbArg); //----------------------------------------------------------------------------- //! Get the events that are currently enabled for this channel. //! //! @return >0 Event bits that are enabled (see EventCode above). //! =0 No events are enabled. //! <0 Channel not assigned to a Poller object. //----------------------------------------------------------------------------- inline int GetEvents() {return (chPoller ? static_cast(chEvents) : -1);} //----------------------------------------------------------------------------- //! Get the file descriptor number associated with this channel. //! //! @return >=0 The file descriptor number. //! < 0 No file desciptor associated with the channel. //----------------------------------------------------------------------------- inline int GetFD() {return chFD;} //----------------------------------------------------------------------------- //! Set the callback object and argument associated with this channel. //! //! @param cbP Pointer to the callback object (see above). The callback //! object must not be deleted while associated to a channel. A //! null callback object pointer effectively disables the channel. //! @param cbArg The argument to be passed to the callback object. //----------------------------------------------------------------------------- void SetCallBack(CallBack *cbP, void *cbArg=0); //----------------------------------------------------------------------------- //! Set a new file descriptor to be associated with this channel. The channel //! is removed from polling consideration but remains attached to the poller. //! The new file descriptor is recorded but the channel remains disabled. You //! must use Enable() to add the file descriptor back to the polling set. This //! allows you to retract a file descriptor about to be closed without having a //! new file descriptor handy (e.g., use -1). This facilitates channel re-use. //! //! @param fd The associated file descriptor number. //----------------------------------------------------------------------------- void SetFD(int fd); //----------------------------------------------------------------------------- //! Constructor. //! //! @param pollP Pointer to the poller object to which this channel will be //! assigned. Events are initially disabled after assignment and no //! timeout applies. Poller object assignment is permanent for the //! life of the channel object. //! @param fd The associated file descriptor number. It should not be //! assigned to any other channel and must be valid when the //! channel is enabled. Use SetFD() to set a new value. //! @param cbP Pointer to the callback object (see above). The callback //! object must not be deleted while associated to a channel. //! A callback object must exist in order for the channel to be //! enabled. Use SetCallBack() if you defered setting it here. //! @param cbArg The argument to be passed to the callback object. //----------------------------------------------------------------------------- Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0); private: //----------------------------------------------------------------------------- //! Destuctor is private, use Delete() to delete this object. //----------------------------------------------------------------------------- ~Channel() {} struct dlQ {Channel *next; Channel *prev;}; XrdSysRecMutex chMutex; dlQ attList; // List of attached channels dlQ tmoList; // List of channels in the timeout queue Poller *chPoller; // The effective poller Poller *chPollXQ; // The real poller CallBack *chCB; // CallBack function void *chCBA; // CallBack argument int chFD; // Associated file descriptor int pollEnt; // Used only for poll() type pollers int chRTO; // Read timeout value (0 means none) int chWTO; // Write timeout value (0 means none) time_t rdDL; // Read deadline time_t wrDL; // Write deadline time_t deadLine; // The deadline in effect (read or write) char dlType; // The deadline type in deadLine as CallBack type char chEvents; // Enabled events as Channel type char chStat; // Channel status below (!0 -> in callback mode) enum Status {isClear = 0, isCBMode, isDead}; char inTOQ; // True if the channel is in the timeout queue char inPSet; // FD is in the actual poll set char reMod; // Modify issued while defered, re-issue needed short chFault; // Defered error, 0 if all is well void Reset(Poller *thePoller, int fd, int eNum=0); }; /******************************************************************************/ /* C l a s s P o l l e r */ /******************************************************************************/ //----------------------------------------------------------------------------- //! Define a poller object interface. A poller fields and dispatches event //! callbacks. An actual instance of a poller object is obtained by using the //! Create() method. You cannot simply create an instance of this object using //! new or in-place declaration since it is abstract. Any number of these //! objects may created. Each creation spawns a polling thread. //----------------------------------------------------------------------------- class Poller { friend class BootStrap; friend class Channel; public: //----------------------------------------------------------------------------- //! Create a specialized instance of a poller object, initialize it, and start //! the polling process. You must call Create() to obtain a specialized poller. //! //! @param eNum Place where errno is placed upon failure. //! @param eTxt Place where a pointer to the description of the failing //! operation is to be set. If null, no description is returned. //! @param crOpts Poller options (see static const optxxx): //! optTOM - Timeout resumption after a timeout event must be //! manually reenabled. By default, event timeouts are //! automatically renabled after successful callbacks. //! //! @return !0 Poller successfully created and started. //! eNum contains zero. //! eTxt if not null contains a null string. //! The returned value is a pointer to the Poller object. //! 0 Poller could not be created. //! eNum contains the associated errno value. //! eTxt if not null contains the failing operation. //----------------------------------------------------------------------------- enum CreateOpts {optTOM}; static Poller *Create(int &eNum, const char **eTxt=0, int crOpts=0); //----------------------------------------------------------------------------- //! Stop a poller object. Active callbacks are completed. Pending callbacks are //! discarded. After which the poller event thread exits. Subsequently, each //! associated channel is disabled and removed from the poller object. If the //! channel is enabled for a StopEvent, the stop callback is invoked. However, //! any attempt to use the channel methods that require an active poller will //! return an error. //! //! Since a stopped poller cannot be restarted; the only thing left is to delete //! it. This also applies to all the associated channels since they no longer //! have an active poller. //----------------------------------------------------------------------------- void Stop(); //----------------------------------------------------------------------------- //! Constructor //! //! @param cFD The file descriptor to send commands to the poll thread. //! @param rFD The file descriptor to recv commands in the poll thread. //----------------------------------------------------------------------------- Poller(int cFD, int rFD); //----------------------------------------------------------------------------- //! Destructor. Stop() is effecively called when this object is deleted. //----------------------------------------------------------------------------- virtual ~Poller() {} protected: struct PipeData; void CbkTMO(); bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt); inline int GetFault(Channel *cP) {return cP->chFault;} inline int GetPollEnt(Channel *cP) {return cP->pollEnt;} int GetRequest(); bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd); inline void LockChannel(Channel *cP) {cP->chMutex.Lock();} int Poll2Enum(short events); int SendCmd(PipeData &cmd); void SetPollEnt(Channel *cP, int ptEnt); bool TmoAdd(Channel *cP, int tmoSet); void TmoDel(Channel *cP); int TmoGet(); inline void UnLockChannel(Channel *cP) {cP->chMutex.UnLock();} //! Start the polling event loop. An implementation must be supplied. Begin() //! is called via the internal BootStrap class from a new thread. //! virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) = 0; //! Remove a channel to the poll set. An implementation must be supplied. //! The channel is locked when this method is called but must be unlocked by the //! method if a command is sent to the poller thread and isLocked set to false. //! virtual void Exclude(Channel *cP, bool &isLocked, bool dover=1) = 0; //! Add a channel to the poll set. An implementation must be supplied. //! The channel is locked when this method is called but must be unlocked by the //! method if a command is sent to the poller thread and isLocked set to false. //! virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked) = 0; //! Modify the event status of a channel. An implementation must be supplied. //! The channel is locked when this method is called but must be unlocked by the //! method if a command is sent to the poller thread and isLocked set to false. //! virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked) = 0; //! Shutdown the poller. An implementation must be supplied. The shutdown method //! must release any allocated storage and close private file descriptors. The //! polling thread will have already been terminated and x-thread pipe closed. //! Warning: the derived destructor *must* call Stop() and do nothing else! // virtual void Shutdown() = 0; // The following is common to all implementations // Channel *attBase; // -> First channel in attach queue or 0 Channel *tmoBase; // -> First channel in timeout queue or 0 pthread_t pollTid; // Poller's thread ID struct pollfd pipePoll; // Stucture to wait for pipe events int cmdFD; // FD to send PipeData commands int reqFD; // FD to recv PipeData requests struct PipeData {char req; char evt; short ent; int fd; XrdSysSemaphore *theSem; enum cmd {NoOp = 0, MdFD = 1, Post = 2, MiFD = 3, RmFD = 4, Stop = 5}; PipeData(char reQ=0, char evT=0, short enT=0, int fD =0, XrdSysSemaphore *sP=0) : req(reQ), evt(evT), ent(enT), fd(fD), theSem(sP) {} ~PipeData() {} }; PipeData reqBuff; // Buffer used by poller thread to recv data char *pipeBuff; // Read resumption point in buffer int pipeBlen; // Number of outstanding bytes unsigned char tmoMask; // Timeout mask CPP_ATOMIC_TYPE(bool) wakePend; // Wakeup is effectively pending (don't send) bool chDead; // True if channel deleted by callback static time_t maxTime; // Maximum time allowed private: void Attach(Channel *cP); void Detach(Channel *cP, bool &isLocked, bool keep=true); void WakeUp(); // newPoller() called to get a specialized new poll object at in response to // Create(). A specialized implementation must be supplied. // static Poller *newPoller(int pFD[2], int &eNum, const char **eTxt); XrdSysMutex adMutex; // Mutex for adding & detaching channels XrdSysMutex toMutex; // Mutex for handling the timeout list }; }; }; #endif