//------------------------------------------------------------------------------ // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN) // Author: Krzysztof Jamrog , // Michal Simon //------------------------------------------------------------------------------ // This file is part of the XRootD software suite. // // XRootD is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // XRootD is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with XRootD. If not, see . // // In applying this licence, CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. //------------------------------------------------------------------------------ #ifndef __XRD_CL_OPERATIONS_HH__ #define __XRD_CL_OPERATIONS_HH__ #include #include #include #include #include #include "XrdCl/XrdClXRootDResponses.hh" #include "XrdCl/XrdClOperationHandlers.hh" #include "XrdClArg.hh" #include "XrdSys/XrdSysPthread.hh" namespace XrdCl { template class Operation; class Pipeline; //---------------------------------------------------------------------------- //! Wrapper for ResponseHandler, used only internally to run next operation //! after previous one is finished //---------------------------------------------------------------------------- class PipelineHandler: public ResponseHandler { template friend class Operation; public: //------------------------------------------------------------------------ //! Constructor. //! //! @param handler : the handler of our operation //! @param own : if true we have the ownership of handler (it's //! memory), and it is our responsibility to deallocate it //------------------------------------------------------------------------ PipelineHandler( ResponseHandler *handler, bool own ); //------------------------------------------------------------------------ //! Default Constructor. //------------------------------------------------------------------------ PipelineHandler(); //------------------------------------------------------------------------ //! Callback function. //------------------------------------------------------------------------ void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response, HostList *hostList ); //------------------------------------------------------------------------ //! Callback function. //------------------------------------------------------------------------ void HandleResponse( XRootDStatus *status, AnyObject *response ); //------------------------------------------------------------------------ //! Destructor. //------------------------------------------------------------------------ ~PipelineHandler(); //------------------------------------------------------------------------ //! Add new operation to the pipeline //! //! @param operation : operation to add //------------------------------------------------------------------------ void AddOperation( Operation *operation ); //------------------------------------------------------------------------ //! Set workflow to this and all next handlers. In the last handler //! it is used to finish workflow execution //! //! @param prms : a promis that the pipeline will have a result //! @param final : a callable that should be called at the end of //! pipeline //------------------------------------------------------------------------ void Assign( std::promise prms, std::function final ); private: //------------------------------------------------------------------------ //! Callback function implementation; //------------------------------------------------------------------------ void HandleResponseImpl( XRootDStatus *status, AnyObject *response, HostList *hostList = nullptr ); inline void dealloc( XRootDStatus *status, AnyObject *response, HostList *hostList ) { delete status; delete response; delete hostList; } //------------------------------------------------------------------------ //! The handler of our operation //------------------------------------------------------------------------ ResponseHandler *responseHandler; //------------------------------------------------------------------------ //! true, if we own the handler //------------------------------------------------------------------------ bool ownHandler; //------------------------------------------------------------------------ //! Next operation in the pipeline //------------------------------------------------------------------------ std::unique_ptr> nextOperation; //------------------------------------------------------------------------ //! The promise that there will be a result (traveling along the pipeline) //------------------------------------------------------------------------ std::promise prms; //------------------------------------------------------------------------ //! The lambda/function/functor that should be called at the end of the //! pipeline (traveling along the pipeline) //------------------------------------------------------------------------ std::function final; }; //---------------------------------------------------------------------------- //! Operation template. An Operation is a once-use-only object - once executed //! by a Workflow engine it is invalidated. Also if used as an argument for //! >> or | the original object gets invalidated. //! //! @arg HasHndl : true if operation has a handler, false otherwise //---------------------------------------------------------------------------- template class Operation { // Declare friendship between templates template friend class Operation; friend std::future Async( Pipeline ); friend class Pipeline; friend class PipelineHandler; public: //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Operation() : valid( true ) { } //------------------------------------------------------------------------ //! Move constructor between template instances. //------------------------------------------------------------------------ template Operation( Operation && op ) : handler( std::move( op.handler ) ), valid( true ) { if( !op.valid ) throw std::invalid_argument( "Cannot construct " "Operation from an invalid Operation!" ); op.valid = false; } //------------------------------------------------------------------------ //! Destructor //------------------------------------------------------------------------ virtual ~Operation() { } //------------------------------------------------------------------------ //! Name of the operation. //------------------------------------------------------------------------ virtual std::string ToString() = 0; //------------------------------------------------------------------------ //! Move current object into newly allocated instance //! //! @return : the new instance //------------------------------------------------------------------------ virtual Operation* Move() = 0; //------------------------------------------------------------------------ //! Move current object into newly allocated instance, and convert //! it into 'handled' operation. //! //! @return : the new instance //------------------------------------------------------------------------ virtual Operation* ToHandled() = 0; protected: //------------------------------------------------------------------------ //! Run operation //! //! @param prom : the promise that we will have a result //! @param final : the object to call at the end of pipeline //! @param args : forwarded arguments //! @param bucket : number of the bucket with arguments //! //! @return : stOK if operation was scheduled for execution //! successfully, stError otherwise //------------------------------------------------------------------------ void Run( std::promise prms, std::function final ) { static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow"); handler->Assign( std::move( prms ), std::move( final ) ); XRootDStatus st = RunImpl(); if( st.IsOK() ) handler.release(); else ForceHandler( st ); } //------------------------------------------------------------------------ //! Run the actual operation //! //! @param params : container with parameters forwarded from //! previous operation //! @return : status of the operation //! @param bucket : number of the bucket with arguments //------------------------------------------------------------------------ virtual XRootDStatus RunImpl() = 0; //------------------------------------------------------------------------ //! Handle error caused by missing parameter //! //! @param err : error object //! //! @return : default operation status (actual status containg //! error information is passed to the handler) //------------------------------------------------------------------------ void ForceHandler( const XRootDStatus &status ) { handler->HandleResponse( new XRootDStatus( status ), nullptr ); // HandleResponse already freed the memory so we have to // release the unique pointer handler.release(); } //------------------------------------------------------------------------ //! Add next operation in the pipeline //! //! @param op : operation to add //------------------------------------------------------------------------ void AddOperation( Operation *op ) { if( handler ) { handler->AddOperation( op ); } } //------------------------------------------------------------------------ //! Operation handler //------------------------------------------------------------------------ std::unique_ptr handler; //------------------------------------------------------------------------ //! Flag indicating if it is a valid object //------------------------------------------------------------------------ bool valid; }; //---------------------------------------------------------------------------- //! A wrapper around operation pipeline. A Pipeline is a once-use-only //! object - once executed by a Workflow engine it is invalidated. //! //! Takes ownership of given operation pipeline (which is in most would //! be a temporary object) //---------------------------------------------------------------------------- class Pipeline { template friend class ParallelOperation; friend std::future Async( Pipeline ); public: //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline( Operation *op ) : operation( op->Move() ) { } //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline( Operation &op ) : operation( op.Move() ) { } //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline( Operation &&op ) : operation( op.Move() ) { } Pipeline( Operation *op ) : operation( op->ToHandled() ) { } //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline( Operation &op ) : operation( op.ToHandled() ) { } //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline( Operation &&op ) : operation( op.ToHandled() ) { } Pipeline( Pipeline &&pipe ) : operation( std::move( pipe.operation ) ) { } //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ Pipeline& operator=( Pipeline &&pipe ) { operation = std::move( pipe.operation ); return *this; } //------------------------------------------------------------------------ //! Conversion to Operation //! //! @throws : std::logic_error if pipeline is invalid //------------------------------------------------------------------------ operator Operation&() { if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." ); return *operation.get(); } //------------------------------------------------------------------------ //! Conversion to boolean //! //! @return : true if it's a valid pipeline, false otherwise //------------------------------------------------------------------------ operator bool() { return bool( operation ); } private: //------------------------------------------------------------------------ //! Member access declaration, provides access to the underlying //! operation. //! //! @return : pointer to the underlying //------------------------------------------------------------------------ Operation* operator->() { return operation.get(); } //------------------------------------------------------------------------ //! Schedules the underlying pipeline for execution. //! //! @param args : forwarded arguments //! @param bucket : number of bucket with forwarded params //! @param final : to be called at the end of the pipeline //------------------------------------------------------------------------ void Run( std::function final = nullptr ) { if( ftr.valid() ) throw std::logic_error( "Pipeline is already running" ); // a promise that the pipe will have a result std::promise prms; ftr = prms.get_future(); operation->Run( std::move( prms ), std::move( final ) ); } //------------------------------------------------------------------------ //! First operation in the pipeline //------------------------------------------------------------------------ std::unique_ptr> operation; //------------------------------------------------------------------------ //! The future result of the pipeline //------------------------------------------------------------------------ std::future ftr; }; //---------------------------------------------------------------------------- //! Helper function, schedules execution of given pipeline //! //! @param pipeline : the pipeline to be executed //! //! @return : future status of the operation //---------------------------------------------------------------------------- inline std::future Async( Pipeline pipeline ) { pipeline.Run(); return std::move( pipeline.ftr ); } //---------------------------------------------------------------------------- //! Helper function, schedules execution of given pipeline and waits for //! the status //! //! @param pipeline : the pipeline to be executed //! //! @return : status of the operation //---------------------------------------------------------------------------- inline XRootDStatus WaitFor( Pipeline pipeline ) { return Async( std::move( pipeline ) ).get(); } //---------------------------------------------------------------------------- //! Concrete Operation template //! Defines | and >> operator as well as operation arguments. //! //! @arg Derived : the class that derives from this template (CRTP) //! @arg HasHndl : true if operation has a handler, false otherwise //! @arg Args : operation arguments //---------------------------------------------------------------------------- template class Derived, bool HasHndl, typename HdlrFactory, typename ... Args> class ConcreteOperation: public Operation { template class, bool, typename, typename ...> friend class ConcreteOperation; public: //------------------------------------------------------------------------ //! Constructor //! //! @param args : operation arguments //------------------------------------------------------------------------ ConcreteOperation( Args&&... args ) : args( std::tuple( std::move( args )... ) ) { static_assert( !HasHndl, "It is only possible to construct operation without handler" ); } //------------------------------------------------------------------------ //! Move constructor from other states //! //! @arg from : state from which the object is being converted //! //! @param op : the object that is being converted //------------------------------------------------------------------------ template ConcreteOperation( ConcreteOperation && op ) : Operation( std::move( op ) ), args( std::move( op.args ) ) { } //------------------------------------------------------------------------ //! Adds ResponseHandler/function/functor/lambda/future handler for //! the operation. //! //! Note: due to reference collapsing this covers both l-value and //! r-value references. //! //! @param func : function/functor/lambda //------------------------------------------------------------------------ template Derived operator>>( Hdlr &&hdlr ) { // check if the resulting handler should be owned by us or by the user, // if the user passed us directly a ResponseHandler it's owned by the // user, otherwise we need to wrap the argument in a handler and in this // case the resulting handler will be owned by us constexpr bool own = !IsResponseHandler::value; return this->StreamImpl( HdlrFactory::Create( hdlr ), own ); } //------------------------------------------------------------------------ //! Creates a pipeline of 2 or more operations //! //! @param op : operation to add //! //! @return : handled operation //------------------------------------------------------------------------ Derived operator|( Operation &op ) { return PipeImpl( *this, op ); } //------------------------------------------------------------------------ //! Creates a pipeline of 2 or more operations //! //! @param op : operation to add //! //! @return : handled operation //------------------------------------------------------------------------ Derived operator|( Operation &&op ) { return PipeImpl( *this, op ); } //------------------------------------------------------------------------ //! Creates a pipeline of 2 or more operations //! //! @param op operation to add //! //! @return handled operation //------------------------------------------------------------------------ Derived operator|( Operation &op ) { return PipeImpl( *this, op ); } //------------------------------------------------------------------------ //! Creates a pipeline of 2 or more operations //! //! @param op : operation to add //! //! @return : handled operation //------------------------------------------------------------------------ Derived operator|( Operation &&op ) { return PipeImpl( *this, op ); } protected: //------------------------------------------------------------------------ //! Move current object into newly allocated instance //! //! @return : the new instance //------------------------------------------------------------------------ Operation* Move() { Derived *me = static_cast*>( this ); return new Derived( std::move( *me ) ); } //------------------------------------------------------------------------ //! Transform operation to handled //! //! @return Operation& //------------------------------------------------------------------------ Operation* ToHandled() { this->handler.reset( new PipelineHandler() ); Derived *me = static_cast*>( this ); return new Derived( std::move( *me ) ); } //------------------------------------------------------------------------ //! Transform into a new instance with desired state //! //! @return : new instance in the desired state //------------------------------------------------------------------------ template Derived Transform() { Derived *me = static_cast*>( this ); return Derived( std::move( *me ) ); } //------------------------------------------------------------------------ //! Implements operator>> functionality //! //! @param h : handler to be added //! @ //! @return : return an instance of Derived //------------------------------------------------------------------------ inline Derived StreamImpl( ResponseHandler *handler, bool own ) { static_assert( !HasHndl, "Operator >> is available only for operation without handler" ); this->handler.reset( new PipelineHandler( handler, own ) ); return Transform(); } //------------------------------------------------------------------------ //! Implements operator| functionality //! //! @param me : reference to myself (*this) //! @param op : reference to the other operation //! //! @return : move-copy of myself //------------------------------------------------------------------------ inline static Derived PipeImpl( ConcreteOperation &me, Operation &op ) { me.AddOperation( op.Move() ); return me.template Transform(); } //------------------------------------------------------------------------ //! Implements operator| functionality //! //! @param me : reference to myself (*this) //! @param op : reference to the other operation //! //! @return : move-copy of myself //------------------------------------------------------------------------ inline static Derived PipeImpl( ConcreteOperation &me, Operation &op ) { me.AddOperation( op.ToHandled() ); return me.template Transform(); } //------------------------------------------------------------------------ //! Implements operator| functionality //! //! @param me : reference to myself (*this) //! @param op : reference to the other operation //! //! @return : move-copy of myself //------------------------------------------------------------------------ inline static Derived PipeImpl( ConcreteOperation &me, Operation &op ) { me.handler.reset( new PipelineHandler() ); me.AddOperation( op.Move() ); return me.template Transform(); } //------------------------------------------------------------------------ //! Implements operator| functionality //! //! @param me : reference to myself (*this) //! @param op : reference to the other operation //! //! @return : move-copy of myself //------------------------------------------------------------------------ inline static Derived PipeImpl( ConcreteOperation &me, Operation &op ) { me.handler.reset( new PipelineHandler() ); me.AddOperation( op.ToHandled() ); return me.template Transform(); } //------------------------------------------------------------------------ //! Operation arguments //------------------------------------------------------------------------ std::tuple args; }; } #endif // __XRD_CL_OPERATIONS_HH__