// // experimental/parallel_group.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP #define ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include #include "asio/async_result.hpp" #include "asio/detail/array.hpp" #include "asio/detail/memory.hpp" #include "asio/detail/type_traits.hpp" #include "asio/detail/utility.hpp" #include "asio/experimental/cancellation_condition.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { // Helper trait for getting a tuple from a completion signature. template struct parallel_op_signature_as_tuple; template struct parallel_op_signature_as_tuple { typedef std::tuple...> type; }; // Helper trait for concatenating completion signatures. template struct parallel_group_signature; template struct parallel_group_signature { typedef asio::detail::array order_type; typedef R0 raw_type(Args0...); typedef R0 type(order_type, Args0...); }; template struct parallel_group_signature { typedef asio::detail::array order_type; typedef R0 raw_type(Args0..., Args1...); typedef R0 type(order_type, Args0..., Args1...); }; template struct parallel_group_signature { typedef asio::detail::array order_type; typedef typename parallel_group_signature::raw_type, SigN...>::raw_type raw_type; typedef typename parallel_group_signature::raw_type, SigN...>::type type; }; template void parallel_group_launch(Condition cancellation_condition, Handler handler, std::tuple& ops, asio::detail::index_sequence); // Helper trait for determining ranged parallel group completion signatures. template struct ranged_parallel_group_signature; template struct ranged_parallel_group_signature { typedef std::vector order_type; typedef R raw_type( std::vector...); typedef R type(order_type, std::vector...); }; template void ranged_parallel_group_launch(Condition cancellation_condition, Handler handler, Range&& range, const Allocator& allocator); char (¶llel_group_has_iterator_helper(...))[2]; template char parallel_group_has_iterator_helper(T*, typename T::iterator* = 0); template struct parallel_group_has_iterator_typedef { enum { value = (sizeof((parallel_group_has_iterator_helper)((T*)(0))) == 1) }; }; } // namespace detail /// Type trait used to determine whether a type is a range of asynchronous /// operations that can be used with with @c make_parallel_group. template struct is_async_operation_range { #if defined(GENERATING_DOCUMENTATION) /// The value member is true if the type may be used as a range of /// asynchronous operations. static const bool value; #else enum { value = detail::parallel_group_has_iterator_typedef::value }; #endif }; /// A group of asynchronous operations that may be launched in parallel. /** * See the documentation for asio::experimental::make_parallel_group for * a usage example. */ template class parallel_group { private: struct initiate_async_wait { template void operator()(Handler&& h, Condition&& c, std::tuple&& ops) const { detail::parallel_group_launch( std::forward(c), std::forward(h), ops, asio::detail::index_sequence_for()); } }; std::tuple ops_; public: /// Constructor. explicit parallel_group(Ops... ops) : ops_(std::move(ops)...) { } /// The completion signature for the group of operations. typedef typename detail::parallel_group_signature...>::type signature; /// Initiate an asynchronous wait for the group of operations. /** * Launches the group and asynchronously waits for completion. * * @param cancellation_condition A function object, called on completion of * an operation within the group, that is used to determine whether to cancel * the remaining operations. The function object is passed the arguments of * the completed operation's handler. To trigger cancellation of the remaining * operations, it must return a asio::cancellation_type value other * than asio::cancellation_type::none. * * @param token A @ref completion_token whose signature is comprised of * a @c std::array indicating the completion order of the * operations, followed by all operations' completion handler arguments. * * The library provides the following @c cancellation_condition types: * * @li asio::experimental::wait_for_all * @li asio::experimental::wait_for_one * @li asio::experimental::wait_for_one_error * @li asio::experimental::wait_for_one_success */ template auto async_wait(CancellationCondition cancellation_condition, CompletionToken&& token) -> decltype( asio::async_initiate( declval(), token, std::move(cancellation_condition), std::move(ops_))) { return asio::async_initiate( initiate_async_wait(), token, std::move(cancellation_condition), std::move(ops_)); } }; /// Create a group of operations that may be launched in parallel. /** * For example: * @code asio::experimental::make_parallel_group( * [&](auto token) * { * return in.async_read_some(asio::buffer(data), token); * }, * [&](auto token) * { * return timer.async_wait(token); * } * ).async_wait( * asio::experimental::wait_for_all(), * []( * std::array completion_order, * std::error_code ec1, std::size_t n1, * std::error_code ec2 * ) * { * switch (completion_order[0]) * { * case 0: * { * std::cout << "descriptor finished: " << ec1 << ", " << n1 << "\n"; * } * break; * case 1: * { * std::cout << "timer finished: " << ec2 << "\n"; * } * break; * } * } * ); * @endcode */ template ASIO_NODISCARD inline parallel_group make_parallel_group(Ops... ops) { return parallel_group(std::move(ops)...); } /// A range-based group of asynchronous operations that may be launched in /// parallel. /** * See the documentation for asio::experimental::make_parallel_group for * a usage example. */ template > class ranged_parallel_group { private: struct initiate_async_wait { template void operator()(Handler&& h, Condition&& c, Range&& range, const Allocator& allocator) const { detail::ranged_parallel_group_launch(std::move(c), std::move(h), std::forward(range), allocator); } }; Range range_; Allocator allocator_; public: /// Constructor. explicit ranged_parallel_group(Range range, const Allocator& allocator = Allocator()) : range_(std::move(range)), allocator_(allocator) { } /// The completion signature for the group of operations. typedef typename detail::ranged_parallel_group_signature< completion_signature_of_t< decay_t())>>, Allocator>::type signature; /// Initiate an asynchronous wait for the group of operations. /** * Launches the group and asynchronously waits for completion. * * @param cancellation_condition A function object, called on completion of * an operation within the group, that is used to determine whether to cancel * the remaining operations. The function object is passed the arguments of * the completed operation's handler. To trigger cancellation of the remaining * operations, it must return a asio::cancellation_type value other * than asio::cancellation_type::none. * * @param token A @ref completion_token whose signature is comprised of * a @c std::vector indicating the completion order of * the operations, followed by a vector for each of the completion signature's * arguments. * * The library provides the following @c cancellation_condition types: * * @li asio::experimental::wait_for_all * @li asio::experimental::wait_for_one * @li asio::experimental::wait_for_one_error * @li asio::experimental::wait_for_one_success */ template auto async_wait(CancellationCondition cancellation_condition, CompletionToken&& token) -> decltype( asio::async_initiate( declval(), token, std::move(cancellation_condition), std::move(range_), allocator_)) { return asio::async_initiate( initiate_async_wait(), token, std::move(cancellation_condition), std::move(range_), allocator_); } }; /// Create a group of operations that may be launched in parallel. /** * @param range A range containing the operations to be launched. * * For example: * @code * using op_type = decltype( * socket1.async_read_some( * asio::buffer(data1), * asio::deferred * ) * ); * * std::vector ops; * * ops.push_back( * socket1.async_read_some( * asio::buffer(data1), * asio::deferred * ) * ); * * ops.push_back( * socket2.async_read_some( * asio::buffer(data2), * asio::deferred * ) * ); * * asio::experimental::make_parallel_group(ops).async_wait( * asio::experimental::wait_for_all(), * []( * std::vector completion_order, * std::vector e, * std::vector n * ) * { * for (std::size_t i = 0; i < completion_order.size(); ++i) * { * std::size_t idx = completion_order[i]; * std::cout << "socket " << idx << " finished: "; * std::cout << e[idx] << ", " << n[idx] << "\n"; * } * } * ); * @endcode */ template ASIO_NODISCARD inline ranged_parallel_group> make_parallel_group(Range&& range, constraint_t< is_async_operation_range>::value > = 0) { return ranged_parallel_group>(std::forward(range)); } /// Create a group of operations that may be launched in parallel. /** * @param allocator Specifies the allocator to be used with the result vectors. * * @param range A range containing the operations to be launched. * * For example: * @code * using op_type = decltype( * socket1.async_read_some( * asio::buffer(data1), * asio::deferred * ) * ); * * std::vector ops; * * ops.push_back( * socket1.async_read_some( * asio::buffer(data1), * asio::deferred * ) * ); * * ops.push_back( * socket2.async_read_some( * asio::buffer(data2), * asio::deferred * ) * ); * * asio::experimental::make_parallel_group( * std::allocator_arg_t, * my_allocator, * ops * ).async_wait( * asio::experimental::wait_for_all(), * []( * std::vector completion_order, * std::vector e, * std::vector n * ) * { * for (std::size_t i = 0; i < completion_order.size(); ++i) * { * std::size_t idx = completion_order[i]; * std::cout << "socket " << idx << " finished: "; * std::cout << e[idx] << ", " << n[idx] << "\n"; * } * } * ); * @endcode */ template ASIO_NODISCARD inline ranged_parallel_group, Allocator> make_parallel_group(allocator_arg_t, const Allocator& allocator, Range&& range, constraint_t< is_async_operation_range>::value > = 0) { return ranged_parallel_group, Allocator>( std::forward(range), allocator); } } // namespace experimental } // namespace asio #include "asio/detail/pop_options.hpp" #include "asio/experimental/impl/parallel_group.hpp" #endif // ASIO_EXPERIMENTAL_PARALLEL_GROUP_HPP