//------------------------------------------------------------------------------ // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN) // Author: Lukasz Janyst //------------------------------------------------------------------------------ // XRootD is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // XRootD is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with XRootD. If not, see . //------------------------------------------------------------------------------ #ifndef __XRD_CL_ASYNC_SOCKET_HANDLER_HH__ #define __XRD_CL_ASYNC_SOCKET_HANDLER_HH__ #include "XrdCl/XrdClSocket.hh" #include "XrdCl/XrdClConstants.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdCl/XrdClPoller.hh" #include "XrdCl/XrdClPostMasterInterfaces.hh" #include "XrdCl/XrdClTaskManager.hh" #include #include namespace XrdCl { class Stream; //---------------------------------------------------------------------------- //! Utility class handling asynchronous socket interactions and forwarding //! events to the parent stream. //---------------------------------------------------------------------------- class AsyncSocketHandler: public SocketHandler { //------------------------------------------------------------------------ // We need an extra task for rescheduling of HS request that received // a wait response. //------------------------------------------------------------------------ class WaitTask: public XrdCl::Task { public: WaitTask( XrdCl::AsyncSocketHandler *handler, XrdCl::Message *msg ): pHandler( handler ), pMsg( msg ) { std::ostringstream o; o << "WaitTask for: 0x" << msg; SetName( o.str() ); } virtual time_t Run( time_t now ) { pHandler->RetryHSMsg( pMsg ); return 0; } private: XrdCl::AsyncSocketHandler *pHandler; XrdCl::Message *pMsg; }; public: //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ AsyncSocketHandler( Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum ); //------------------------------------------------------------------------ //! Destructor //------------------------------------------------------------------------ ~AsyncSocketHandler(); //------------------------------------------------------------------------ //! Set address //------------------------------------------------------------------------ void SetAddress( const XrdNetAddr &address ) { pSockAddr = address; } //------------------------------------------------------------------------ //! Get the address that the socket is connected to //------------------------------------------------------------------------ const XrdNetAddr &GetAddress() const { return pSockAddr; } //------------------------------------------------------------------------ //! Connect to the currently set address //------------------------------------------------------------------------ Status Connect( time_t timeout ); //------------------------------------------------------------------------ //! Close the connection //------------------------------------------------------------------------ Status Close(); //------------------------------------------------------------------------ //! Set a stream object to be notified about the status of the operations //------------------------------------------------------------------------ void SetStream( Stream *stream ); //------------------------------------------------------------------------ //! Handle a socket event //------------------------------------------------------------------------ virtual void Event( uint8_t type, XrdCl::Socket */*socket*/ ); //------------------------------------------------------------------------ //! Enable uplink //------------------------------------------------------------------------ Status EnableUplink() { if( !pPoller->EnableWriteNotification( pSocket, true, pTimeoutResolution ) ) return Status( stFatal, errPollerError ); return Status(); } //------------------------------------------------------------------------ //! Disable uplink //------------------------------------------------------------------------ Status DisableUplink() { if( !pPoller->EnableWriteNotification( pSocket, false ) ) return Status( stFatal, errPollerError ); return Status(); } //------------------------------------------------------------------------ //! Get stream name //------------------------------------------------------------------------ const std::string &GetStreamName() { return pStreamName; } //------------------------------------------------------------------------ //! Get timestamp of last registered socket activity //------------------------------------------------------------------------ time_t GetLastActivity() { return pLastActivity; } private: //------------------------------------------------------------------------ // Connect returned //------------------------------------------------------------------------ void OnConnectionReturn(); //------------------------------------------------------------------------ // Got a write readiness event //------------------------------------------------------------------------ void OnWrite(); //------------------------------------------------------------------------ // Got a write readiness event while handshaking //------------------------------------------------------------------------ void OnWriteWhileHandshaking(); Status WriteMessageAndRaw( Message *toWrite, Message *&sign ); Status WriteSeparately( Message *toWrite, Message *&sign ); //------------------------------------------------------------------------ // Write the current message //------------------------------------------------------------------------ Status WriteCurrentMessage( Message *toWrite ); //------------------------------------------------------------------------ // Write the message, its signature and its body //------------------------------------------------------------------------ Status WriteVMessage( Message *toWrite, Message *&sign, ChunkList *chunks, uint32_t *asyncOffset ); //------------------------------------------------------------------------ // Got a read readiness event //------------------------------------------------------------------------ void OnRead(); //------------------------------------------------------------------------ // Got a read readiness event while handshaking //------------------------------------------------------------------------ void OnReadWhileHandshaking(); //------------------------------------------------------------------------ // Read a message //------------------------------------------------------------------------ Status ReadMessage( Message *&toRead ); //------------------------------------------------------------------------ // Handle fault //------------------------------------------------------------------------ void OnFault( Status st ); //------------------------------------------------------------------------ // Handle fault while handshaking //------------------------------------------------------------------------ void OnFaultWhileHandshaking( Status st ); //------------------------------------------------------------------------ // Handle write timeout event //------------------------------------------------------------------------ void OnWriteTimeout(); //------------------------------------------------------------------------ // Handle read timeout event //------------------------------------------------------------------------ void OnReadTimeout(); //------------------------------------------------------------------------ // Handle timeout event while handshaking //------------------------------------------------------------------------ void OnTimeoutWhileHandshaking(); //------------------------------------------------------------------------ // Get signature for given message //------------------------------------------------------------------------ Status GetSignature( Message *toSign, Message *&sign ); //------------------------------------------------------------------------ // Initialize the iovec with given message //------------------------------------------------------------------------ inline void ToIov( Message &msg, iovec &iov ); //------------------------------------------------------------------------ // Update iovec after write //------------------------------------------------------------------------ inline void UpdateAfterWrite( Message &msg, iovec &iov, int &bytesRead ); //------------------------------------------------------------------------ // Add chunks to the given iovec //------------------------------------------------------------------------ inline uint32_t ToIov( ChunkList *chunks, const uint32_t *offset, iovec *iov ); //------------------------------------------------------------------------ // Update raw data after write //------------------------------------------------------------------------ inline void UpdateAfterWrite( ChunkList *chunks, uint32_t *offset, iovec *iov, int &bytesWritten ); //------------------------------------------------------------------------ // Retry hand shake message //------------------------------------------------------------------------ void RetryHSMsg( Message *msg ); //------------------------------------------------------------------------ // Extract the value of a wait response // // @param rsp : the server response // @return : if rsp is a wait response then its value // otherwise -1 //------------------------------------------------------------------------ inline kXR_int32 HandleWaitRsp( Message *rsp ); //------------------------------------------------------------------------ //! Classify errno while reading/writing //! //! Once we are at R5, change Transport interface and use: //! Transport::ClassifyErrno //------------------------------------------------------------------------ Status ClassifyErrno( int error ); //------------------------------------------------------------------------ // Data members //------------------------------------------------------------------------ Poller *pPoller; TransportHandler *pTransport; AnyObject *pChannelData; uint16_t pSubStreamNum; Stream *pStream; std::string pStreamName; Socket *pSocket; Message *pIncoming; Message *pHSIncoming; Message *pOutgoing; Message *pSignature; Message *pHSOutgoing; XrdNetAddr pSockAddr; HandShakeData *pHandShakeData; bool pHandShakeDone; uint16_t pTimeoutResolution; time_t pConnectionStarted; time_t pConnectionTimeout; bool pHeaderDone; std::pair pIncHandler; bool pOutMsgDone; OutgoingMsgHandler *pOutHandler; uint32_t pIncMsgSize; uint32_t pOutMsgSize; time_t pLastActivity; }; } #endif // __XRD_CL_ASYNC_SOCKET_HANDLER_HH__