// Copyright (C) 2004-2006 The Trustees of Indiana University. // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Douglas Gregor // Andrew Lumsdaine #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included" #endif #include <boost/graph/parallel/process_group.hpp> #include <boost/optional.hpp> #include <boost/shared_ptr.hpp> #include <vector> namespace boost { namespace graph { namespace distributed { /// A unary predicate that always returns "true". struct always_push { template<typename T> bool operator()(const T&) const { return true; } }; /** A distributed queue adaptor. * * Class template @c distributed_queue implements a distributed queue * across a process group. The distributed queue is an adaptor over an * existing (local) queue, which must model the @ref Buffer * concept. Each process stores a distinct copy of the local queue, * from which it draws or removes elements via the @ref pop and @ref * top members. * * The value type of the local queue must be a model of the @ref * GlobalDescriptor concept. The @ref push operation of the * distributed queue passes (via a message) the value to its owning * processor. Thus, the elements within a particular local queue are * guaranteed to have the process owning that local queue as an owner. * * Synchronization of distributed queues occurs in the @ref empty and * @ref size functions, which will only return "empty" values (true or * 0, respectively) when the entire distributed queue is empty. If the * local queue is empty but the distributed queue is not, the * operation will block until either condition changes. When the @ref * size function of a nonempty queue returns, it returns the size of * the local queue. These semantics were selected so that sequential * code that processes elements in the queue via the following idiom * can be parallelized via introduction of a distributed queue: * * distributed_queue<...> Q; * Q.push(x); * while (!Q.empty()) { * // do something, that may push a value onto Q * } * * In the parallel version, the initial @ref push operation will place * the value @c x onto its owner's queue. All processes will * synchronize at the call to empty, and only the process owning @c x * will be allowed to execute the loop (@ref Q.empty() returns * false). This iteration may in turn push values onto other remote * queues, so when that process finishes execution of the loop body * and all processes synchronize again in @ref empty, more processes * may have nonempty local queues to execute. Once all local queues * are empty, @ref Q.empty() returns @c false for all processes. * * The distributed queue can receive messages at two different times: * during synchronization and when polling @ref empty. Messages are * always received during synchronization, to ensure that accurate * local queue sizes can be determines. However, whether @ref empty * should poll for messages is specified as an option to the * constructor. Polling may be desired when the order in which * elements in the queue are processed is not important, because it * permits fewer synchronization steps and less communication * overhead. However, when more strict ordering guarantees are * required, polling may be semantically incorrect. By disabling * polling, one ensures that parallel execution using the idiom above * will not process an element at a later "level" before an earlier * "level". * * The distributed queue nearly models the @ref Buffer * concept. However, the @ref push routine does not necessarily * increase the result of @c size() by one (although the size of the * global queue does increase by one). */ template<typename ProcessGroup, typename OwnerMap, typename Buffer, typename UnaryPredicate = always_push> class distributed_queue { typedef distributed_queue self_type; enum { /** Message indicating a remote push. The message contains a * single value x of type value_type that is to be pushed on the * receiver's queue. */ msg_push, /** Push many elements at once. */ msg_multipush }; public: typedef ProcessGroup process_group_type; typedef Buffer buffer_type; typedef typename buffer_type::value_type value_type; typedef typename buffer_type::size_type size_type; /** Construct a new distributed queue. * * Build a new distributed queue that communicates over the given @p * process_group, whose local queue is initialized via @p buffer and * which may or may not poll for messages. */ explicit distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, const Buffer& buffer, bool polling = false); /** Construct a new distributed queue. * * Build a new distributed queue that communicates over the given @p * process_group, whose local queue is initialized via @p buffer and * which may or may not poll for messages. */ explicit distributed_queue(const ProcessGroup& process_group = ProcessGroup(), const OwnerMap& owner = OwnerMap(), const Buffer& buffer = Buffer(), const UnaryPredicate& pred = UnaryPredicate(), bool polling = false); /** Construct a new distributed queue. * * Build a new distributed queue that communicates over the given @p * process_group, whose local queue is default-initalized and which * may or may not poll for messages. */ distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, const UnaryPredicate& pred, bool polling = false); /** Virtual destructor required with virtual functions. * */ virtual ~distributed_queue() {} /** Push an element onto the distributed queue. * * The element will be sent to its owner process to be added to that * process's local queue. If polling is enabled for this queue and * the owner process is the current process, the value will be * immediately pushed onto the local queue. * * Complexity: O(1) messages of size O(sizeof(value_type)) will be * transmitted. */ void push(const value_type& x); /** Pop an element off the local queue. * * @p @c !empty() */ void pop() { buffer.pop(); } /** * Return the element at the top of the local queue. * * @p @c !empty() */ value_type& top() { return buffer.top(); } /** * \overload */ const value_type& top() const { return buffer.top(); } /** Determine if the queue is empty. * * When the local queue is nonempty, returns @c true. If the local * queue is empty, synchronizes with all other processes in the * process group until either (1) the local queue is nonempty * (returns @c true) (2) the entire distributed queue is empty * (returns @c false). */ bool empty() const; /** Determine the size of the local queue. * * The behavior of this routine is equivalent to the behavior of * @ref empty, except that when @ref empty returns true this * function returns the size of the local queue and when @ref empty * returns false this function returns zero. */ size_type size() const; // private: /** Synchronize the distributed queue and determine if all queues * are empty. * * \returns \c true when all local queues are empty, or false if at least * one of the local queues is nonempty. * Defined as virtual for derived classes like depth_limited_distributed_queue. */ virtual bool do_synchronize() const; private: // Setup triggers void setup_triggers(); // Message handlers void handle_push(int source, int tag, const value_type& value, trigger_receive_context); void handle_multipush(int source, int tag, const std::vector<value_type>& values, trigger_receive_context); mutable ProcessGroup process_group; OwnerMap owner; mutable Buffer buffer; UnaryPredicate pred; bool polling; typedef std::vector<value_type> outgoing_buffer_t; typedef std::vector<outgoing_buffer_t> outgoing_buffers_t; shared_ptr<outgoing_buffers_t> outgoing_buffers; }; /// Helper macro containing the normal names for the template /// parameters to distributed_queue. #define BOOST_DISTRIBUTED_QUEUE_PARMS \ typename ProcessGroup, typename OwnerMap, typename Buffer, \ typename UnaryPredicate /// Helper macro containing the normal template-id for /// distributed_queue. #define BOOST_DISTRIBUTED_QUEUE_TYPE \ distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate> /** Synchronize all processes involved with the given distributed queue. * * This function will synchronize all of the local queues for a given * distributed queue, by ensuring that no additional messages are in * transit. It is rarely required by the user, because most * synchronization of distributed queues occurs via the @c empty or @c * size methods. */ template<BOOST_DISTRIBUTED_QUEUE_PARMS> inline void synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q) { Q.do_synchronize(); } /// Construct a new distributed queue. template<typename ProcessGroup, typename OwnerMap, typename Buffer> inline distributed_queue<ProcessGroup, OwnerMap, Buffer> make_distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner, const Buffer& buffer, bool polling = false) { typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type; return result_type(process_group, owner, buffer, polling); } } } } // end namespace boost::graph::distributed #include <boost/graph/distributed/detail/queue.ipp> #undef BOOST_DISTRIBUTED_QUEUE_TYPE #undef BOOST_DISTRIBUTED_QUEUE_PARMS #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP