// // experimental/promise.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2021 Klemens D. Morgenstern // (klemens dot morgenstern at gmx dot net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef BOOST_ASIO_EXPERIMENTAL_PROMISE_HPP #define BOOST_ASIO_EXPERIMENTAL_PROMISE_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include #include #include #include #include #include #include #include #include #include #include namespace boost { namespace asio { namespace experimental { template struct use_promise_t {}; constexpr use_promise_t<> use_promise; template struct is_promise : std::false_type {}; template struct is_promise> : std::true_type {}; template constexpr bool is_promise_v = is_promise::value; template concept is_promise_c = is_promise_v>; template struct promise_value_type { using type = std::tuple; }; template struct promise_value_type { using type = T; }; template <> struct promise_value_type<> { using type = std::monostate; }; #if defined(GENERATING_DOCUMENTATION) /// The primary template is not defined. template struct promise { }; #endif // defined(GENERATING_DOCUMENTATION) template struct promise { using value_type = typename promise_value_type::type; using tuple_type = std::tuple; using executor_type = Executor; executor_type get_executor() const { if (impl_) return impl_->executor; else return {}; } void cancel(cancellation_type level = cancellation_type::all) { if (impl_ && !impl_->done) { boost::asio::dispatch(impl_->executor, [level, impl = impl_]{impl->cancel.emit(level);}); } } bool complete() const noexcept { return impl_ && impl_->done; } template auto async_wait(CompletionToken&& token) { assert(impl_); return async_initiate( initiate_async_wait{impl_}, token); } promise() = delete; promise(const promise& ) = delete; promise(promise&& ) noexcept = default; ~promise() { cancel(); } template static auto race(Executor1 exec, Ps ... ps) -> promise), Executor1> { using var_t = std::variant; using pi = detail::promise_impl; struct impl_t : pi { impl_t(Executor1 exec, Ps&& ... ps) : pi(std::move(exec)), tup(std::move(ps)...) { this->slot.template emplace(this); } struct cancel_handler { impl_t* self; cancel_handler(impl_t* self) : self(self) { } void operator()(cancellation_type ct) { [ct, s=self](std::index_sequence) { (std::get(s->tup).cancel(ct), ... ); }(std::make_index_sequence{}); } }; std::tuple...> tup; cancellation_slot slot{this->cancel.slot()}; }; auto impl = std::allocate_shared( get_associated_allocator(exec), exec, std::move(ps)...); impl->executor = exec; [impl, exec](std::index_sequence) { auto step = [&](std::integral_constant) { return [impl] (Args&& ... args) { if (impl->done) return; impl->result = var_t(std::in_place_index, std::forward(args)...); impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) std::apply(std::move(f), std::move(*impl->result)); auto cancel = [&](std::integral_constant) { if constexpr (I != Id) get(impl->tup).cancel(); }; (cancel(std::integral_constant{}), ...); }; }; ( std::get(impl->tup).async_wait( bind_executor(exec, step(std::integral_constant{}))), ... ); }(std::make_index_sequence{}); return {impl}; } template static auto all(Executor1 exec, Ps ... ps) -> promise { using pi = detail::promise_impl< void(typename Ps::value_type...), Executor1>; struct impl_t : pi { impl_t(Executor1 exec, Ps&& ... ps) : pi(std::move(exec)), tup(std::move(ps)...) { this->slot.template emplace(this); } struct cancel_handler { impl_t* self; cancel_handler(impl_t* self) : self(self) { } void operator()(cancellation_type level) { [level, s=self](std::index_sequence) { (std::get(s->tup).cancel(level), ... ); }(std::make_index_sequence{}); } }; std::tuple...> tup; std::tuple...> partial_result; cancellation_slot slot{this->cancel.slot()}; }; auto impl = std::allocate_shared( get_associated_allocator(exec), exec, std::move(ps)...); impl->executor = exec; [impl, exec](std::index_sequence) { auto step = [&](std::integral_constant) { return [impl](Args&& ... args) { std::get(impl->partial_result).emplace( std::forward(args)...); if ((std::get(impl->partial_result) && ...)) // we're done. { impl->result = {*std::get(impl->partial_result)...}; impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) std::apply(std::move(f), std::move(*impl->result)); } }; }; ( std::get(impl->tup).async_wait( bind_executor(exec, step(std::integral_constant{}))), ... ); }(std::make_index_sequence{}); return {impl}; } template static auto race(Ps ... ps) { auto exec = get<0>(std::tie(ps...)).get_executor(); return race(std::move(exec), std::move(ps)...); } template static auto all(Ps ... ps) { auto exec = get<0>(std::tie(ps...)).get_executor(); return all(std::move(exec), std::move(ps)...); } template #if !defined(GENERATING_DOCUMENTATION) requires requires (Range r) { {*std::begin(r)} -> is_promise_c; {*std:: end(r)} -> is_promise_c; } #endif // !defined(GENERATING_DOCUMENTATION) static auto race(Executor1 exec, Range range) { using var_t = typename std::decay_t< decltype(*std::begin(range))>::value_type; using signature_type = std::conditional_t< std::is_same_v, void(std::size_t), void(std::size_t, var_t)>; using pi = detail::promise_impl; using promise_t = promise; struct impl_t : pi { impl_t(Executor1 exec, Range&& range) : pi(std::move(exec)), range(std::move(range)) { this->slot.template emplace(this); } struct cancel_handler { impl_t* self; cancel_handler(impl_t* self) : self(self) { } void operator()(boost::asio::cancellation_type ct) { for (auto& r : self->range) r.cancel(ct); } }; Range range; cancellation_slot slot{this->cancel.slot()}; }; const auto size = std::distance(std::begin(range), std::end(range)); auto impl = std::allocate_shared( get_associated_allocator(exec), exec, std::move(range)); impl->executor = exec; if (size == 0u) { if constexpr (std::is_same_v) impl->result = {-1}; else impl->result = {-1, var_t{}}; impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) { boost::asio::post(exec, [impl, f = std::move(f)]() mutable { std::apply(std::move(f), std::move(*impl->result)); }); } return promise_t{impl}; } auto idx = 0u; for (auto& val : impl->range) { val.async_wait( bind_executor(exec, [idx, impl](Args&&... args) { if (impl->done) return; if constexpr (std::is_same_v) impl->result = idx; else impl->result = std::make_tuple(idx, var_t(std::forward(args)...)); impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) std::apply(std::move(f), std::move(*impl->result)); auto jdx = 0u; for (auto &tc : impl->range) if (jdx++ != idx) tc.cancel(); })); idx++; } return promise_t{impl}; } template #if !defined(GENERATING_DOCUMENTATION) requires requires (Range r) { {*std::begin(r)} -> is_promise_c; {*std:: end(r)} -> is_promise_c; } #endif // !defined(GENERATING_DOCUMENTATION) static auto all(Executor1 exec, Range range) -> promise< void( std::vector< typename std::decay_t< decltype(*std::begin(range)) >::value_type > ), Executor1> { using var_t = typename std::decay_t< decltype(*std::begin(range))>::value_type; using pi = detail::promise_impl), Executor1>; struct impl_t : pi { impl_t(Executor1 exec, Range&& range) : pi(std::move(exec)), range(std::move(range)) { this->slot.template emplace(this); } struct cancel_handler { impl_t* self; cancel_handler(impl_t* self) : self(self) { } void operator()(cancellation_type ct) { for (auto& r : self->range) r.cancel(ct); } }; Range range; std::vector> partial_result; cancellation_slot slot{this->cancel.slot()}; }; const auto size = std::distance(std::begin(range), std::end(range)); auto impl = std::allocate_shared( get_associated_allocator(exec), exec, std::move(range)); impl->executor = exec; impl->partial_result.resize(size); if (size == 0u) { impl->result.emplace(); impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) boost::asio::post(exec, [impl, f = std::move(f)]() mutable { std::apply(std::move(f), std::move(*impl->result)); }); return {impl}; } auto idx = 0u; for (auto& val : impl->range) { val.async_wait(bind_executor( exec, [idx, impl](Args&&... args) { impl->partial_result[idx].emplace(std::forward(args)...); if (std::all_of(impl->partial_result.begin(), impl->partial_result.end(), [](auto &opt) {return opt.has_value();})) { impl->result.emplace(); get<0>(*impl->result).reserve(impl->partial_result.size()); for (auto& p : impl->partial_result) get<0>(*impl->result).push_back(std::move(*p)); impl->done = true; if (auto f = std::exchange(impl->completion, nullptr); !!f) std::apply(std::move(f), std::move(*impl->result)); } })); idx++; } return {impl}; } template #if !defined(GENERATING_DOCUMENTATION) requires requires (Range r) { {*std::begin(r)} -> is_promise_c; {*std:: end(r)} -> is_promise_c; } #endif // !defined(GENERATING_DOCUMENTATION) static auto race(Range range) { if (std::begin(range) == std::end(range)) throw std::logic_error( "Can't use race on an empty range with deduced executor"); else return race(std::begin(range)->get_executor(), std::move(range)); } template #if !defined(GENERATING_DOCUMENTATION) requires requires (Range&& r) { {*std::begin(r)} -> is_promise_c; {*std:: end(r)} -> is_promise_c; } #endif // !defined(GENERATING_DOCUMENTATION) static auto all(Range range) { if (std::begin(range) == std::end(range)) throw std::logic_error( "Can't use all on an empty range with deduced executor"); else return all(std::begin(range)->get_executor(), std::move(range)); } private: #if !defined(GENERATING_DOCUMENTATION) template friend struct promise; friend struct detail::promise_handler; #endif // !defined(GENERATING_DOCUMENTATION) std::shared_ptr> impl_; promise(std::shared_ptr> impl) : impl_(impl) { } struct initiate_async_wait { std::shared_ptr> self_; template void operator()(WaitHandler&& handler) const { const auto exec = get_associated_executor(handler, self_->executor); auto cancel = get_associated_cancellation_slot(handler); if (self_->done) { boost::asio::post(exec, [self = self_, h = std::forward(handler)]() mutable { std::apply(std::forward(h), std::move(*self->result)); }); } else { if (cancel.is_connected()) { struct cancel_handler { std::weak_ptr> self; cancel_handler( std::weak_ptr> self) : self(std::move(self)) { } void operator()(cancellation_type level) const { if (auto p = self.lock(); p != nullptr) p->cancel.emit(level); } }; cancel.template emplace(self_); } self_->completion = {exec, std::forward(handler)}; } } }; }; } // namespace experimental #if !defined(GENERATING_DOCUMENTATION) template struct async_result, R(Args...)> { using handler_type = experimental::detail::promise_handler< void(typename decay::type...), Executor>; template static auto initiate(Initiation initiation, experimental::use_promise_t, InitArgs... args) -> typename handler_type::promise_type { handler_type ht{get_associated_executor(initiation)}; std::move(initiation)(ht, std::move(args)...); return ht.make_promise(); } }; #endif // !defined(GENERATING_DOCUMENTATION) } // namespace asio } // namespace boost #include #endif // BOOST_ASIO_EXPERIMENTAL_PROMISE_HPP