LCOV - code coverage report
Current view: top level - capy - when_all.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 98.1 % 161 158 3
Test Date: 2026-03-31 23:08:34 Functions: 92.9 % 436 405 31

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
      11                 : #define BOOST_CAPY_WHEN_ALL_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/io_result_combinators.hpp>
      15                 : #include <boost/capy/continuation.hpp>
      16                 : #include <boost/capy/concept/executor.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <coroutine>
      19                 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
      20                 : #include <boost/capy/ex/io_env.hpp>
      21                 : #include <boost/capy/ex/frame_allocator.hpp>
      22                 : #include <boost/capy/task.hpp>
      23                 : 
      24                 : #include <array>
      25                 : #include <atomic>
      26                 : #include <exception>
      27                 : #include <memory>
      28                 : #include <optional>
      29                 : #include <ranges>
      30                 : #include <stdexcept>
      31                 : #include <stop_token>
      32                 : #include <tuple>
      33                 : #include <type_traits>
      34                 : #include <utility>
      35                 : #include <vector>
      36                 : 
      37                 : namespace boost {
      38                 : namespace capy {
      39                 : 
      40                 : namespace detail {
      41                 : 
      42                 : /** Holds the result of a single task within when_all.
      43                 : */
      44                 : template<typename T>
      45                 : struct result_holder
      46                 : {
      47                 :     std::optional<T> value_;
      48                 : 
      49 HIT          81 :     void set(T v)
      50                 :     {
      51              81 :         value_ = std::move(v);
      52              81 :     }
      53                 : 
      54              69 :     T get() &&
      55                 :     {
      56              69 :         return std::move(*value_);
      57                 :     }
      58                 : };
      59                 : 
      60                 : /** Core shared state for when_all operations.
      61                 : 
      62                 :     Contains all members and methods common to both heterogeneous (variadic)
      63                 :     and homogeneous (range) when_all implementations. State classes embed
      64                 :     this via composition to avoid CRTP destructor ordering issues.
      65                 : 
      66                 :     @par Thread Safety
      67                 :     Atomic operations protect exception capture and completion count.
      68                 : */
      69                 : struct when_all_core
      70                 : {
      71                 :     std::atomic<std::size_t> remaining_count_;
      72                 : 
      73                 :     // Exception storage - first error wins, others discarded
      74                 :     std::atomic<bool> has_exception_{false};
      75                 :     std::exception_ptr first_exception_;
      76                 : 
      77                 :     std::stop_source stop_source_;
      78                 : 
      79                 :     // Bridges parent's stop token to our stop_source
      80                 :     struct stop_callback_fn
      81                 :     {
      82                 :         std::stop_source* source_;
      83               1 :         void operator()() const { source_->request_stop(); }
      84                 :     };
      85                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
      86                 :     std::optional<stop_callback_t> parent_stop_callback_;
      87                 : 
      88                 :     continuation continuation_;
      89                 :     io_env const* caller_env_ = nullptr;
      90                 : 
      91              72 :     explicit when_all_core(std::size_t count) noexcept
      92              72 :         : remaining_count_(count)
      93                 :     {
      94              72 :     }
      95                 : 
      96                 :     /** Capture an exception (first one wins). */
      97              19 :     void capture_exception(std::exception_ptr ep)
      98                 :     {
      99              19 :         bool expected = false;
     100              19 :         if(has_exception_.compare_exchange_strong(
     101                 :             expected, true, std::memory_order_relaxed))
     102              17 :             first_exception_ = ep;
     103              19 :     }
     104                 : };
     105                 : 
     106                 : /** Shared state for heterogeneous when_all (variadic overload).
     107                 : 
     108                 :     @tparam Ts The result types of the tasks.
     109                 : */
     110                 : template<typename... Ts>
     111                 : struct when_all_state
     112                 : {
     113                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     114                 : 
     115                 :     when_all_core core_;
     116                 :     std::tuple<result_holder<Ts>...> results_;
     117                 :     std::array<continuation, task_count> runner_handles_{};
     118                 : 
     119                 :     std::atomic<bool> has_error_{false};
     120                 :     std::error_code first_error_;
     121                 : 
     122              50 :     when_all_state()
     123              50 :         : core_(task_count)
     124                 :     {
     125              50 :     }
     126                 : 
     127                 :     /** Record the first error (subsequent errors are discarded). */
     128              43 :     void record_error(std::error_code ec)
     129                 :     {
     130              43 :         bool expected = false;
     131              43 :         if(has_error_.compare_exchange_strong(
     132                 :             expected, true, std::memory_order_relaxed))
     133              29 :             first_error_ = ec;
     134              43 :     }
     135                 : };
     136                 : 
     137                 : /** Shared state for homogeneous when_all (range overload).
     138                 : 
     139                 :     Stores extracted io_result payloads in a vector indexed by task
     140                 :     position. Tracks the first error_code for error propagation.
     141                 : 
     142                 :     @tparam T The payload type extracted from io_result.
     143                 : */
     144                 : template<typename T>
     145                 : struct when_all_homogeneous_state
     146                 : {
     147                 :     when_all_core core_;
     148                 :     std::vector<std::optional<T>> results_;
     149                 :     std::unique_ptr<continuation[]> runner_handles_;
     150                 : 
     151                 :     std::atomic<bool> has_error_{false};
     152                 :     std::error_code first_error_;
     153                 : 
     154              11 :     explicit when_all_homogeneous_state(std::size_t count)
     155              11 :         : core_(count)
     156              22 :         , results_(count)
     157              11 :         , runner_handles_(std::make_unique<continuation[]>(count))
     158                 :     {
     159              11 :     }
     160                 : 
     161              16 :     void set_result(std::size_t index, T value)
     162                 :     {
     163              16 :         results_[index].emplace(std::move(value));
     164              16 :     }
     165                 : 
     166                 :     /** Record the first error (subsequent errors are discarded). */
     167               7 :     void record_error(std::error_code ec)
     168                 :     {
     169               7 :         bool expected = false;
     170               7 :         if(has_error_.compare_exchange_strong(
     171                 :             expected, true, std::memory_order_relaxed))
     172               5 :             first_error_ = ec;
     173               7 :     }
     174                 : };
     175                 : 
     176                 : /** Specialization for void io_result children (no payload storage). */
     177                 : template<>
     178                 : struct when_all_homogeneous_state<std::tuple<>>
     179                 : {
     180                 :     when_all_core core_;
     181                 :     std::unique_ptr<continuation[]> runner_handles_;
     182                 : 
     183                 :     std::atomic<bool> has_error_{false};
     184                 :     std::error_code first_error_;
     185                 : 
     186               3 :     explicit when_all_homogeneous_state(std::size_t count)
     187               3 :         : core_(count)
     188               3 :         , runner_handles_(std::make_unique<continuation[]>(count))
     189                 :     {
     190               3 :     }
     191                 : 
     192                 :     /** Record the first error (subsequent errors are discarded). */
     193               1 :     void record_error(std::error_code ec)
     194                 :     {
     195               1 :         bool expected = false;
     196               1 :         if(has_error_.compare_exchange_strong(
     197                 :             expected, true, std::memory_order_relaxed))
     198               1 :             first_error_ = ec;
     199               1 :     }
     200                 : };
     201                 : 
     202                 : /** Wrapper coroutine that intercepts task completion for when_all.
     203                 : 
     204                 :     Parameterized on StateType to work with both heterogeneous (variadic)
     205                 :     and homogeneous (range) state types. All state types expose their
     206                 :     shared members through a `core_` member of type when_all_core.
     207                 : 
     208                 :     @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
     209                 : */
     210                 : template<typename StateType>
     211                 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
     212                 : {
     213                 :     struct promise_type
     214                 :         : frame_alloc_mixin
     215                 :     {
     216                 :         StateType* state_ = nullptr;
     217                 :         std::size_t index_ = 0;
     218                 :         io_env env_;
     219                 : 
     220             145 :         when_all_runner get_return_object() noexcept
     221                 :         {
     222                 :             return when_all_runner(
     223             145 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     224                 :         }
     225                 : 
     226             145 :         std::suspend_always initial_suspend() noexcept
     227                 :         {
     228             145 :             return {};
     229                 :         }
     230                 : 
     231             145 :         auto final_suspend() noexcept
     232                 :         {
     233                 :             struct awaiter
     234                 :             {
     235                 :                 promise_type* p_;
     236             145 :                 bool await_ready() const noexcept { return false; }
     237             145 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     238                 :                 {
     239             145 :                     auto& core = p_->state_->core_;
     240             145 :                     auto* counter = &core.remaining_count_;
     241             145 :                     auto* caller_env = core.caller_env_;
     242             145 :                     auto& cont = core.continuation_;
     243                 : 
     244             145 :                     h.destroy();
     245                 : 
     246             145 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     247             145 :                     if(remaining == 1)
     248              72 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     249              73 :                     return detail::symmetric_transfer(std::noop_coroutine());
     250                 :                 }
     251 MIS           0 :                 void await_resume() const noexcept {}
     252                 :             };
     253 HIT         145 :             return awaiter{this};
     254                 :         }
     255                 : 
     256             126 :         void return_void() noexcept {}
     257                 : 
     258              19 :         void unhandled_exception() noexcept
     259                 :         {
     260              19 :             state_->core_.capture_exception(std::current_exception());
     261              19 :             state_->core_.stop_source_.request_stop();
     262              19 :         }
     263                 : 
     264                 :         template<class Awaitable>
     265                 :         struct transform_awaiter
     266                 :         {
     267                 :             std::decay_t<Awaitable> a_;
     268                 :             promise_type* p_;
     269                 : 
     270             145 :             bool await_ready() { return a_.await_ready(); }
     271             145 :             decltype(auto) await_resume() { return a_.await_resume(); }
     272                 : 
     273                 :             template<class Promise>
     274             144 :             auto await_suspend(std::coroutine_handle<Promise> h)
     275                 :             {
     276                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     277                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     278             144 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     279                 :                 else
     280                 :                     return a_.await_suspend(h, &p_->env_);
     281                 :             }
     282                 :         };
     283                 : 
     284                 :         template<class Awaitable>
     285             145 :         auto await_transform(Awaitable&& a)
     286                 :         {
     287                 :             using A = std::decay_t<Awaitable>;
     288                 :             if constexpr (IoAwaitable<A>)
     289                 :             {
     290                 :                 return transform_awaiter<Awaitable>{
     291             290 :                     std::forward<Awaitable>(a), this};
     292                 :             }
     293                 :             else
     294                 :             {
     295                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     296                 :             }
     297             145 :         }
     298                 :     };
     299                 : 
     300                 :     std::coroutine_handle<promise_type> h_;
     301                 : 
     302             145 :     explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
     303             145 :         : h_(h)
     304                 :     {
     305             145 :     }
     306                 : 
     307                 :     // Enable move for all clang versions - some versions need it
     308                 :     when_all_runner(when_all_runner&& other) noexcept
     309                 :         : h_(std::exchange(other.h_, nullptr))
     310                 :     {
     311                 :     }
     312                 : 
     313                 :     when_all_runner(when_all_runner const&) = delete;
     314                 :     when_all_runner& operator=(when_all_runner const&) = delete;
     315                 :     when_all_runner& operator=(when_all_runner&&) = delete;
     316                 : 
     317             145 :     auto release() noexcept
     318                 :     {
     319             145 :         return std::exchange(h_, nullptr);
     320                 :     }
     321                 : };
     322                 : 
     323                 : /** Create an io_result-aware runner for a single awaitable (range path).
     324                 : 
     325                 :     Checks the error code, records errors and requests stop on failure,
     326                 :     or extracts the payload on success.
     327                 : */
     328                 : template<IoAwaitable Awaitable, typename StateType>
     329                 : when_all_runner<StateType>
     330              32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
     331                 : {
     332                 :     auto result = co_await std::move(inner);
     333                 : 
     334                 :     if(result.ec)
     335                 :     {
     336                 :         state->record_error(result.ec);
     337                 :         state->core_.stop_source_.request_stop();
     338                 :     }
     339                 :     else
     340                 :     {
     341                 :         using PayloadT = io_result_payload_t<
     342                 :             awaitable_result_t<Awaitable>>;
     343                 :         if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     344                 :         {
     345                 :             state->set_result(index,
     346                 :                 extract_io_payload(std::move(result)));
     347                 :         }
     348                 :     }
     349              64 : }
     350                 : 
     351                 : /** Create a runner for io_result children that requests stop on ec. */
     352                 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
     353                 : when_all_runner<when_all_state<Ts...>>
     354              97 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
     355                 : {
     356                 :     auto result = co_await std::move(inner);
     357                 :     auto ec = result.ec;
     358                 :     std::get<Index>(state->results_).set(std::move(result));
     359                 : 
     360                 :     if(ec)
     361                 :     {
     362                 :         state->record_error(ec);
     363                 :         state->core_.stop_source_.request_stop();
     364                 :     }
     365             194 : }
     366                 : 
     367                 : /** Launcher that uses io_result-aware runners. */
     368                 : template<IoAwaitable... Awaitables>
     369                 : class when_all_io_launcher
     370                 : {
     371                 :     using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
     372                 : 
     373                 :     std::tuple<Awaitables...>* awaitables_;
     374                 :     state_type* state_;
     375                 : 
     376                 : public:
     377              50 :     when_all_io_launcher(
     378                 :         std::tuple<Awaitables...>* awaitables,
     379                 :         state_type* state)
     380              50 :         : awaitables_(awaitables)
     381              50 :         , state_(state)
     382                 :     {
     383              50 :     }
     384                 : 
     385              50 :     bool await_ready() const noexcept
     386                 :     {
     387              50 :         return sizeof...(Awaitables) == 0;
     388                 :     }
     389                 : 
     390              50 :     std::coroutine_handle<> await_suspend(
     391                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     392                 :     {
     393              50 :         state_->core_.continuation_.h = continuation;
     394              50 :         state_->core_.caller_env_ = caller_env;
     395                 : 
     396              50 :         if(caller_env->stop_token.stop_possible())
     397                 :         {
     398               2 :             state_->core_.parent_stop_callback_.emplace(
     399               1 :                 caller_env->stop_token,
     400               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     401                 : 
     402               1 :             if(caller_env->stop_token.stop_requested())
     403 MIS           0 :                 state_->core_.stop_source_.request_stop();
     404                 :         }
     405                 : 
     406 HIT          50 :         auto token = state_->core_.stop_source_.get_token();
     407              46 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     408              50 :             (..., launch_one<Is>(caller_env->executor, token));
     409              50 :         }(std::index_sequence_for<Awaitables...>{});
     410                 : 
     411             100 :         return std::noop_coroutine();
     412              50 :     }
     413                 : 
     414              50 :     void await_resume() const noexcept {}
     415                 : 
     416                 : private:
     417                 :     template<std::size_t I>
     418              97 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     419                 :     {
     420              97 :         auto runner = make_when_all_io_runner<I>(
     421              97 :             std::move(std::get<I>(*awaitables_)), state_);
     422                 : 
     423              97 :         auto h = runner.release();
     424              97 :         h.promise().state_ = state_;
     425              97 :         h.promise().env_ = io_env{caller_ex, token,
     426              97 :             state_->core_.caller_env_->frame_allocator};
     427                 : 
     428              97 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     429              97 :         state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
     430             194 :     }
     431                 : };
     432                 : 
     433                 : /** Helper to extract a single result from state.
     434                 :     This is a separate function to work around a GCC-11 ICE that occurs
     435                 :     when using nested immediately-invoked lambdas with pack expansion.
     436                 : */
     437                 : template<std::size_t I, typename... Ts>
     438              69 : auto extract_single_result(when_all_state<Ts...>& state)
     439                 : {
     440              69 :     return std::move(std::get<I>(state.results_)).get();
     441                 : }
     442                 : 
     443                 : /** Extract all results from state as a tuple.
     444                 : */
     445                 : template<typename... Ts>
     446              36 : auto extract_results(when_all_state<Ts...>& state)
     447                 : {
     448              55 :     return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     449              36 :         return std::tuple(extract_single_result<Is>(state)...);
     450              72 :     }(std::index_sequence_for<Ts...>{});
     451                 : }
     452                 : 
     453                 : /** Launches all homogeneous runners concurrently.
     454                 : 
     455                 :     Two-phase approach: create all runners first, then post all.
     456                 :     This avoids lifetime issues if a task completes synchronously.
     457                 : */
     458                 : template<typename Range>
     459                 : class when_all_homogeneous_launcher
     460                 : {
     461                 :     using Awaitable = std::ranges::range_value_t<Range>;
     462                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     463                 : 
     464                 :     Range* range_;
     465                 :     when_all_homogeneous_state<PayloadT>* state_;
     466                 : 
     467                 : public:
     468              14 :     when_all_homogeneous_launcher(
     469                 :         Range* range,
     470                 :         when_all_homogeneous_state<PayloadT>* state)
     471              14 :         : range_(range)
     472              14 :         , state_(state)
     473                 :     {
     474              14 :     }
     475                 : 
     476              14 :     bool await_ready() const noexcept
     477                 :     {
     478              14 :         return std::ranges::empty(*range_);
     479                 :     }
     480                 : 
     481              14 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     482                 :     {
     483              14 :         state_->core_.continuation_.h = continuation;
     484              14 :         state_->core_.caller_env_ = caller_env;
     485                 : 
     486              14 :         if(caller_env->stop_token.stop_possible())
     487                 :         {
     488               2 :             state_->core_.parent_stop_callback_.emplace(
     489               1 :                 caller_env->stop_token,
     490               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     491                 : 
     492               1 :             if(caller_env->stop_token.stop_requested())
     493 MIS           0 :                 state_->core_.stop_source_.request_stop();
     494                 :         }
     495                 : 
     496 HIT          14 :         auto token = state_->core_.stop_source_.get_token();
     497                 : 
     498                 :         // Phase 1: Create all runners without dispatching.
     499              14 :         std::size_t index = 0;
     500              46 :         for(auto&& a : *range_)
     501                 :         {
     502              32 :             auto runner = make_when_all_homogeneous_runner(
     503              32 :                 std::move(a), state_, index);
     504                 : 
     505              32 :             auto h = runner.release();
     506              32 :             h.promise().state_ = state_;
     507              32 :             h.promise().index_ = index;
     508              32 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     509                 : 
     510              32 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     511              32 :             ++index;
     512                 :         }
     513                 : 
     514                 :         // Phase 2: Post all runners. Any may complete synchronously.
     515                 :         // After last post, state_ and this may be destroyed.
     516              14 :         auto* handles = state_->runner_handles_.get();
     517              14 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     518              46 :         for(std::size_t i = 0; i < count; ++i)
     519              32 :             caller_env->executor.post(handles[i]);
     520                 : 
     521              28 :         return std::noop_coroutine();
     522              46 :     }
     523                 : 
     524              14 :     void await_resume() const noexcept
     525                 :     {
     526              14 :     }
     527                 : };
     528                 : 
     529                 : } // namespace detail
     530                 : 
     531                 : /** Execute a range of io_result-returning awaitables concurrently.
     532                 : 
     533                 :     Launches all awaitables simultaneously and waits for all to complete.
     534                 :     On success, extracted payloads are collected in a vector preserving
     535                 :     input order. The first error_code cancels siblings and is propagated
     536                 :     in the outer io_result. Exceptions always beat error codes.
     537                 : 
     538                 :     @li All child awaitables run concurrently on the caller's executor
     539                 :     @li Payloads are returned as a vector in input order
     540                 :     @li First error_code wins and cancels siblings
     541                 :     @li Exception always beats error_code
     542                 :     @li Completes only after all children have finished
     543                 : 
     544                 :     @par Thread Safety
     545                 :     The returned task must be awaited from a single execution context.
     546                 :     Child awaitables execute concurrently but complete through the caller's
     547                 :     executor.
     548                 : 
     549                 :     @param awaitables Range of io_result-returning awaitables to execute
     550                 :         concurrently (must not be empty).
     551                 : 
     552                 :     @return A task yielding io_result<vector<PayloadT>> where PayloadT
     553                 :         is the payload extracted from each child's io_result.
     554                 : 
     555                 :     @throws std::invalid_argument if range is empty (thrown before
     556                 :         coroutine suspends).
     557                 :     @throws Rethrows the first child exception after all children
     558                 :         complete (exception beats error_code).
     559                 : 
     560                 :     @par Example
     561                 :     @code
     562                 :     task<void> example()
     563                 :     {
     564                 :         std::vector<io_task<size_t>> reads;
     565                 :         for (auto& buf : buffers)
     566                 :             reads.push_back(stream.read_some(buf));
     567                 : 
     568                 :         auto [ec, counts] = co_await when_all(std::move(reads));
     569                 :         if (ec) { // handle error
     570                 :         }
     571                 :     }
     572                 :     @endcode
     573                 : 
     574                 :     @see IoAwaitableRange, when_all
     575                 : */
     576                 : template<IoAwaitableRange R>
     577                 :     requires detail::is_io_result_v<
     578                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     579                 :     && (!std::is_same_v<
     580                 :             detail::io_result_payload_t<
     581                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     582                 :             std::tuple<>>)
     583              12 : [[nodiscard]] auto when_all(R&& awaitables)
     584                 :     -> task<io_result<std::vector<
     585                 :         detail::io_result_payload_t<
     586                 :             awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     587                 : {
     588                 :     using Awaitable = std::ranges::range_value_t<R>;
     589                 :     using PayloadT = detail::io_result_payload_t<
     590                 :         awaitable_result_t<Awaitable>>;
     591                 :     using OwnedRange = std::remove_cvref_t<R>;
     592                 : 
     593                 :     auto count = std::ranges::size(awaitables);
     594                 :     if(count == 0)
     595                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     596                 : 
     597                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     598                 : 
     599                 :     detail::when_all_homogeneous_state<PayloadT> state(count);
     600                 : 
     601                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     602                 :         &owned_awaitables, &state);
     603                 : 
     604                 :     if(state.core_.first_exception_)
     605                 :         std::rethrow_exception(state.core_.first_exception_);
     606                 : 
     607                 :     if(state.has_error_.load(std::memory_order_relaxed))
     608                 :         co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
     609                 : 
     610                 :     std::vector<PayloadT> results;
     611                 :     results.reserve(count);
     612                 :     for(auto& opt : state.results_)
     613                 :         results.push_back(std::move(*opt));
     614                 : 
     615                 :     co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
     616              24 : }
     617                 : 
     618                 : /** Execute a range of void io_result-returning awaitables concurrently.
     619                 : 
     620                 :     Launches all awaitables simultaneously and waits for all to complete.
     621                 :     Since all awaitables return io_result<>, no payload values are
     622                 :     collected. The first error_code cancels siblings and is propagated.
     623                 :     Exceptions always beat error codes.
     624                 : 
     625                 :     @param awaitables Range of io_result<>-returning awaitables to
     626                 :         execute concurrently (must not be empty).
     627                 : 
     628                 :     @return A task yielding io_result<> whose ec is the first child
     629                 :         error, or default-constructed on success.
     630                 : 
     631                 :     @throws std::invalid_argument if range is empty.
     632                 :     @throws Rethrows the first child exception after all children
     633                 :         complete (exception beats error_code).
     634                 : 
     635                 :     @par Example
     636                 :     @code
     637                 :     task<void> example()
     638                 :     {
     639                 :         std::vector<io_task<>> jobs;
     640                 :         for (int i = 0; i < n; ++i)
     641                 :             jobs.push_back(process(i));
     642                 : 
     643                 :         auto [ec] = co_await when_all(std::move(jobs));
     644                 :     }
     645                 :     @endcode
     646                 : 
     647                 :     @see IoAwaitableRange, when_all
     648                 : */
     649                 : template<IoAwaitableRange R>
     650                 :     requires detail::is_io_result_v<
     651                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     652                 :     && std::is_same_v<
     653                 :             detail::io_result_payload_t<
     654                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     655                 :             std::tuple<>>
     656               4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
     657                 : {
     658                 :     using OwnedRange = std::remove_cvref_t<R>;
     659                 : 
     660                 :     auto count = std::ranges::size(awaitables);
     661                 :     if(count == 0)
     662                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     663                 : 
     664                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     665                 : 
     666                 :     detail::when_all_homogeneous_state<std::tuple<>> state(count);
     667                 : 
     668                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     669                 :         &owned_awaitables, &state);
     670                 : 
     671                 :     if(state.core_.first_exception_)
     672                 :         std::rethrow_exception(state.core_.first_exception_);
     673                 : 
     674                 :     if(state.has_error_.load(std::memory_order_relaxed))
     675                 :         co_return io_result<>{state.first_error_};
     676                 : 
     677                 :     co_return io_result<>{};
     678               8 : }
     679                 : 
     680                 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
     681                 : 
     682                 :     Overload selected when all children return io_result<Ts...>.
     683                 :     The error_code is lifted out of each child into a single outer
     684                 :     io_result. On success all values are returned; on failure the
     685                 :     first error_code wins.
     686                 : 
     687                 :     @par Exception Safety
     688                 :     Exception always beats error_code. If any child throws, the
     689                 :     exception is rethrown regardless of error_code results.
     690                 : 
     691                 :     @param awaitables One or more awaitables each returning
     692                 :         io_result<Ts...>.
     693                 : 
     694                 :     @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
     695                 :         follows the payload flattening rules.
     696                 : */
     697                 : template<IoAwaitable... As>
     698                 :     requires (sizeof...(As) > 0)
     699                 :           && detail::all_io_result_awaitables<As...>
     700              50 : [[nodiscard]] auto when_all(As... awaitables)
     701                 :     -> task<io_result<
     702                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     703                 : {
     704                 :     using result_type = io_result<
     705                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     706                 : 
     707                 :     detail::when_all_state<awaitable_result_t<As>...> state;
     708                 :     std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
     709                 : 
     710                 :     co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
     711                 : 
     712                 :     // Exception always wins over error_code
     713                 :     if(state.core_.first_exception_)
     714                 :         std::rethrow_exception(state.core_.first_exception_);
     715                 : 
     716                 :     auto r = detail::build_when_all_io_result<result_type>(
     717                 :         detail::extract_results(state));
     718                 :     if(state.has_error_.load(std::memory_order_relaxed))
     719                 :         r.ec = state.first_error_;
     720                 :     co_return r;
     721             100 : }
     722                 : 
     723                 : } // namespace capy
     724                 : } // namespace boost
     725                 : 
     726                 : #endif
        

Generated by: LCOV version 2.3