LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 93.9 % 165 155 10
Test Date: 2026-03-31 23:08:34 Functions: 91.2 % 160 146 14

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      12                 : #define BOOST_CAPY_WHEN_ANY_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/detail/io_result_combinators.hpp>
      16                 : #include <boost/capy/continuation.hpp>
      17                 : #include <boost/capy/concept/executor.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
      22                 : #include <boost/capy/ex/frame_allocator.hpp>
      23                 : #include <boost/capy/ex/io_env.hpp>
      24                 : #include <boost/capy/task.hpp>
      25                 : 
      26                 : #include <array>
      27                 : #include <atomic>
      28                 : #include <exception>
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <ranges>
      33                 : #include <stdexcept>
      34                 : #include <stop_token>
      35                 : #include <tuple>
      36                 : #include <type_traits>
      37                 : #include <utility>
      38                 : #include <variant>
      39                 : #include <vector>
      40                 : 
      41                 : /*
      42                 :    when_any - Race multiple io_result tasks, select first success
      43                 :    =============================================================
      44                 : 
      45                 :    OVERVIEW:
      46                 :    ---------
      47                 :    when_any launches N io_result-returning tasks concurrently. A task
      48                 :    wins by returning !ec; errors and exceptions do not win. Once a
      49                 :    winner is found, stop is requested for siblings and the winner's
      50                 :    payload is returned. If no winner exists (all fail), the first
      51                 :    error_code is returned or the last exception is rethrown.
      52                 : 
      53                 :    ARCHITECTURE:
      54                 :    -------------
      55                 :    The design mirrors when_all but with inverted completion semantics:
      56                 : 
      57                 :      when_all:  complete when remaining_count reaches 0 (all done)
      58                 :      when_any:  complete when has_winner becomes true (first done)
      59                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      60                 : 
      61                 :    Key components:
      62                 :      - when_any_core:    Shared state tracking winner and completion
      63                 :      - when_any_io_runner: Wrapper coroutine for each child task
      64                 :      - when_any_io_launcher/when_any_io_homogeneous_launcher:
      65                 :                           Awaitables that start all runners concurrently
      66                 : 
      67                 :    CRITICAL INVARIANTS:
      68                 :    --------------------
      69                 :    1. Only a task returning !ec can become the winner (via atomic CAS)
      70                 :    2. All tasks must complete before parent resumes (cleanup safety)
      71                 :    3. Stop is requested immediately when winner is determined
      72                 :    4. Exceptions and errors do not claim winner status
      73                 : 
      74                 :    POSITIONAL VARIANT:
      75                 :    -------------------
      76                 :    The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
      77                 :    Index 0 is error_code (failure/no-winner). Index 1..N identifies the
      78                 :    winning child and carries its payload.
      79                 : 
      80                 :    RANGE OVERLOAD:
      81                 :    ---------------
      82                 :    The range overload returns variant<error_code, pair<size_t, T>> for
      83                 :    non-void children or variant<error_code, size_t> for void children.
      84                 : 
      85                 :    MEMORY MODEL:
      86                 :    -------------
      87                 :    Synchronization chain from winner's write to parent's read:
      88                 : 
      89                 :    1. Winner thread writes result_ (non-atomic)
      90                 :    2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
      91                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      92                 :       -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      93                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      94                 :    5. Parent coroutine resumes and reads result_
      95                 : 
      96                 :    Synchronization analysis:
      97                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      98                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      99                 :      in the modification order of remaining_count_
     100                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     101                 :      modification order, establishing happens-before from winner's writes
     102                 :    - Executor dispatch() is expected to provide queue-based synchronization
     103                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     104                 :    - Even inline executors work (same thread = sequenced-before)
     105                 : 
     106                 :    EXCEPTION SEMANTICS:
     107                 :    --------------------
     108                 :    Exceptions do NOT claim winner status. If a child throws, the exception
     109                 :    is recorded but the combinator keeps waiting for a success. Only when
     110                 :    all children complete without a winner does the combinator check: if
     111                 :    any exception was recorded, it is rethrown (exception beats error_code).
     112                 : */
     113                 : 
     114                 : namespace boost {
     115                 : namespace capy {
     116                 : 
     117                 : namespace detail {
     118                 : 
     119                 : /** Core shared state for when_any operations.
     120                 : 
     121                 :     Contains all members and methods common to both heterogeneous (variadic)
     122                 :     and homogeneous (range) when_any implementations. State classes embed
     123                 :     this via composition to avoid CRTP destructor ordering issues.
     124                 : 
     125                 :     @par Thread Safety
     126                 :     Atomic operations protect winner selection and completion count.
     127                 : */
     128                 : struct when_any_core
     129                 : {
     130                 :     std::atomic<std::size_t> remaining_count_;
     131                 :     std::size_t winner_index_{0};
     132                 :     std::exception_ptr winner_exception_;
     133                 :     std::stop_source stop_source_;
     134                 : 
     135                 :     // Bridges parent's stop token to our stop_source
     136                 :     struct stop_callback_fn
     137                 :     {
     138                 :         std::stop_source* source_;
     139 HIT           2 :         void operator()() const noexcept { source_->request_stop(); }
     140                 :     };
     141                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     142                 :     std::optional<stop_callback_t> parent_stop_callback_;
     143                 : 
     144                 :     continuation continuation_;
     145                 :     io_env const* caller_env_ = nullptr;
     146                 : 
     147                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     148                 :     std::atomic<bool> has_winner_{false};
     149                 : 
     150              31 :     explicit when_any_core(std::size_t count) noexcept
     151              31 :         : remaining_count_(count)
     152                 :     {
     153              31 :     }
     154                 : 
     155                 :     /** Atomically claim winner status; exactly one task succeeds. */
     156              52 :     bool try_win(std::size_t index) noexcept
     157                 :     {
     158              52 :         bool expected = false;
     159              52 :         if(has_winner_.compare_exchange_strong(
     160                 :             expected, true, std::memory_order_acq_rel))
     161                 :         {
     162              22 :             winner_index_ = index;
     163              22 :             stop_source_.request_stop();
     164              22 :             return true;
     165                 :         }
     166              30 :         return false;
     167                 :     }
     168                 : 
     169                 :     /** @pre try_win() returned true. */
     170 MIS           0 :     void set_winner_exception(std::exception_ptr ep) noexcept
     171                 :     {
     172               0 :         winner_exception_ = ep;
     173               0 :     }
     174                 : 
     175                 :     // Runners signal completion directly via final_suspend; no member function needed.
     176                 : };
     177                 : 
     178                 : } // namespace detail
     179                 : 
     180                 : namespace detail {
     181                 : 
     182                 : // State for io_result-aware when_any: only !ec wins.
     183                 : template<typename... Ts>
     184                 : struct when_any_io_state
     185                 : {
     186                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     187                 :     using variant_type = std::variant<std::error_code, Ts...>;
     188                 : 
     189                 :     when_any_core core_;
     190                 :     std::optional<variant_type> result_;
     191                 :     std::array<continuation, task_count> runner_handles_{};
     192                 : 
     193                 :     // Last failure (error or exception) for the all-fail case.
     194                 :     // Last writer wins — no priority between errors and exceptions.
     195                 :     std::mutex failure_mu_;
     196                 :     std::error_code last_error_;
     197                 :     std::exception_ptr last_exception_;
     198                 : 
     199 HIT          16 :     when_any_io_state()
     200              16 :         : core_(task_count)
     201                 :     {
     202              16 :     }
     203                 : 
     204              12 :     void record_error(std::error_code ec)
     205                 :     {
     206              12 :         std::lock_guard lk(failure_mu_);
     207              12 :         last_error_ = ec;
     208              12 :         last_exception_ = nullptr;
     209              12 :     }
     210                 : 
     211               7 :     void record_exception(std::exception_ptr ep)
     212                 :     {
     213               7 :         std::lock_guard lk(failure_mu_);
     214               7 :         last_exception_ = ep;
     215               7 :         last_error_ = {};
     216               7 :     }
     217                 : };
     218                 : 
     219                 : // Wrapper coroutine for io_result-aware when_any children.
     220                 : // unhandled_exception records the exception but does NOT claim winner status.
     221                 : template<typename StateType>
     222                 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
     223                 : {
     224                 :     struct promise_type
     225                 :         : frame_alloc_mixin
     226                 :     {
     227                 :         StateType* state_ = nullptr;
     228                 :         std::size_t index_ = 0;
     229                 :         io_env env_;
     230                 : 
     231              82 :         when_any_io_runner get_return_object() noexcept
     232                 :         {
     233                 :             return when_any_io_runner(
     234              82 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     235                 :         }
     236                 : 
     237              82 :         std::suspend_always initial_suspend() noexcept { return {}; }
     238                 : 
     239              82 :         auto final_suspend() noexcept
     240                 :         {
     241                 :             struct awaiter
     242                 :             {
     243                 :                 promise_type* p_;
     244              82 :                 bool await_ready() const noexcept { return false; }
     245              82 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     246                 :                 {
     247              82 :                     auto& core = p_->state_->core_;
     248              82 :                     auto* counter = &core.remaining_count_;
     249              82 :                     auto* caller_env = core.caller_env_;
     250              82 :                     auto& cont = core.continuation_;
     251                 : 
     252              82 :                     h.destroy();
     253                 : 
     254              82 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     255              82 :                     if(remaining == 1)
     256              31 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     257              51 :                     return detail::symmetric_transfer(std::noop_coroutine());
     258                 :                 }
     259 MIS           0 :                 void await_resume() const noexcept {}
     260                 :             };
     261 HIT          82 :             return awaiter{this};
     262                 :         }
     263                 : 
     264              71 :         void return_void() noexcept {}
     265                 : 
     266                 :         // Exceptions do NOT win in io_result when_any
     267              11 :         void unhandled_exception() noexcept
     268                 :         {
     269              11 :             state_->record_exception(std::current_exception());
     270              11 :         }
     271                 : 
     272                 :         template<class Awaitable>
     273                 :         struct transform_awaiter
     274                 :         {
     275                 :             std::decay_t<Awaitable> a_;
     276                 :             promise_type* p_;
     277                 : 
     278              82 :             bool await_ready() { return a_.await_ready(); }
     279              82 :             decltype(auto) await_resume() { return a_.await_resume(); }
     280                 : 
     281                 :             template<class Promise>
     282              81 :             auto await_suspend(std::coroutine_handle<Promise> h)
     283                 :             {
     284                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     285                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     286              81 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     287                 :                 else
     288                 :                     return a_.await_suspend(h, &p_->env_);
     289                 :             }
     290                 :         };
     291                 : 
     292                 :         template<class Awaitable>
     293              82 :         auto await_transform(Awaitable&& a)
     294                 :         {
     295                 :             using A = std::decay_t<Awaitable>;
     296                 :             if constexpr (IoAwaitable<A>)
     297                 :             {
     298                 :                 return transform_awaiter<Awaitable>{
     299             163 :                     std::forward<Awaitable>(a), this};
     300                 :             }
     301                 :             else
     302                 :             {
     303                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     304                 :             }
     305              81 :         }
     306                 :     };
     307                 : 
     308                 :     std::coroutine_handle<promise_type> h_;
     309                 : 
     310              82 :     explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
     311              82 :         : h_(h)
     312                 :     {
     313              82 :     }
     314                 : 
     315                 :     when_any_io_runner(when_any_io_runner&& other) noexcept
     316                 :         : h_(std::exchange(other.h_, nullptr))
     317                 :     {
     318                 :     }
     319                 : 
     320                 :     when_any_io_runner(when_any_io_runner const&) = delete;
     321                 :     when_any_io_runner& operator=(when_any_io_runner const&) = delete;
     322                 :     when_any_io_runner& operator=(when_any_io_runner&&) = delete;
     323                 : 
     324              82 :     auto release() noexcept
     325                 :     {
     326              82 :         return std::exchange(h_, nullptr);
     327                 :     }
     328                 : };
     329                 : 
     330                 : // Runner coroutine: only tries to win when the child returns !ec.
     331                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     332                 : when_any_io_runner<StateType>
     333              30 : make_when_any_io_runner(Awaitable inner, StateType* state)
     334                 : {
     335                 :     auto result = co_await std::move(inner);
     336                 : 
     337                 :     if(!result.ec)
     338                 :     {
     339                 :         // Success: try to claim winner
     340                 :         if(state->core_.try_win(I))
     341                 :         {
     342                 :             try
     343                 :             {
     344                 :                 state->result_.emplace(
     345                 :                     std::in_place_index<I + 1>,
     346                 :                     detail::extract_io_payload(std::move(result)));
     347                 :             }
     348                 :             catch(...)
     349                 :             {
     350                 :                 state->core_.set_winner_exception(std::current_exception());
     351                 :             }
     352                 :         }
     353                 :     }
     354                 :     else
     355                 :     {
     356                 :         // Error: record but don't win
     357                 :         state->record_error(result.ec);
     358                 :     }
     359              60 : }
     360                 : 
     361                 : // Launcher for io_result-aware when_any.
     362                 : template<IoAwaitable... Awaitables>
     363                 : class when_any_io_launcher
     364                 : {
     365                 :     using state_type = when_any_io_state<
     366                 :         io_result_payload_t<awaitable_result_t<Awaitables>>...>;
     367                 : 
     368                 :     std::tuple<Awaitables...>* tasks_;
     369                 :     state_type* state_;
     370                 : 
     371                 : public:
     372              16 :     when_any_io_launcher(
     373                 :         std::tuple<Awaitables...>* tasks,
     374                 :         state_type* state)
     375              16 :         : tasks_(tasks)
     376              16 :         , state_(state)
     377                 :     {
     378              16 :     }
     379                 : 
     380              16 :     bool await_ready() const noexcept
     381                 :     {
     382              16 :         return sizeof...(Awaitables) == 0;
     383                 :     }
     384                 : 
     385              16 :     std::coroutine_handle<> await_suspend(
     386                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     387                 :     {
     388              16 :         state_->core_.continuation_.h = continuation;
     389              16 :         state_->core_.caller_env_ = caller_env;
     390                 : 
     391              16 :         if(caller_env->stop_token.stop_possible())
     392                 :         {
     393               2 :             state_->core_.parent_stop_callback_.emplace(
     394               1 :                 caller_env->stop_token,
     395               1 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     396                 : 
     397               1 :             if(caller_env->stop_token.stop_requested())
     398 MIS           0 :                 state_->core_.stop_source_.request_stop();
     399                 :         }
     400                 : 
     401 HIT          16 :         auto token = state_->core_.stop_source_.get_token();
     402              28 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     403              16 :             (..., launch_one<Is>(caller_env->executor, token));
     404              16 :         }(std::index_sequence_for<Awaitables...>{});
     405                 : 
     406              32 :         return std::noop_coroutine();
     407              16 :     }
     408                 : 
     409              16 :     void await_resume() const noexcept {}
     410                 : 
     411                 : private:
     412                 :     template<std::size_t I>
     413              30 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     414                 :     {
     415              30 :         auto runner = make_when_any_io_runner<I>(
     416              30 :             std::move(std::get<I>(*tasks_)), state_);
     417                 : 
     418              30 :         auto h = runner.release();
     419              30 :         h.promise().state_ = state_;
     420              30 :         h.promise().index_ = I;
     421              30 :         h.promise().env_ = io_env{caller_ex, token,
     422              30 :             state_->core_.caller_env_->frame_allocator};
     423                 : 
     424              30 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     425              30 :         caller_ex.post(state_->runner_handles_[I]);
     426              60 :     }
     427                 : };
     428                 : 
     429                 : /** Shared state for homogeneous io_result-aware when_any (range overload).
     430                 : 
     431                 :     @tparam T The payload type extracted from io_result.
     432                 : */
     433                 : template<typename T>
     434                 : struct when_any_io_homogeneous_state
     435                 : {
     436                 :     when_any_core core_;
     437                 :     std::optional<T> result_;
     438                 :     std::unique_ptr<continuation[]> runner_handles_;
     439                 : 
     440                 :     std::mutex failure_mu_;
     441                 :     std::error_code last_error_;
     442                 :     std::exception_ptr last_exception_;
     443                 : 
     444              13 :     explicit when_any_io_homogeneous_state(std::size_t count)
     445              13 :         : core_(count)
     446              13 :         , runner_handles_(std::make_unique<continuation[]>(count))
     447                 :     {
     448              13 :     }
     449                 : 
     450               6 :     void record_error(std::error_code ec)
     451                 :     {
     452               6 :         std::lock_guard lk(failure_mu_);
     453               6 :         last_error_ = ec;
     454               6 :         last_exception_ = nullptr;
     455               6 :     }
     456                 : 
     457               4 :     void record_exception(std::exception_ptr ep)
     458                 :     {
     459               4 :         std::lock_guard lk(failure_mu_);
     460               4 :         last_exception_ = ep;
     461               4 :         last_error_ = {};
     462               4 :     }
     463                 : };
     464                 : 
     465                 : /** Specialization for void io_result children (no payload storage). */
     466                 : template<>
     467                 : struct when_any_io_homogeneous_state<std::tuple<>>
     468                 : {
     469                 :     when_any_core core_;
     470                 :     std::unique_ptr<continuation[]> runner_handles_;
     471                 : 
     472                 :     std::mutex failure_mu_;
     473                 :     std::error_code last_error_;
     474                 :     std::exception_ptr last_exception_;
     475                 : 
     476               2 :     explicit when_any_io_homogeneous_state(std::size_t count)
     477               2 :         : core_(count)
     478               2 :         , runner_handles_(std::make_unique<continuation[]>(count))
     479                 :     {
     480               2 :     }
     481                 : 
     482               1 :     void record_error(std::error_code ec)
     483                 :     {
     484               1 :         std::lock_guard lk(failure_mu_);
     485               1 :         last_error_ = ec;
     486               1 :         last_exception_ = nullptr;
     487               1 :     }
     488                 : 
     489 MIS           0 :     void record_exception(std::exception_ptr ep)
     490                 :     {
     491               0 :         std::lock_guard lk(failure_mu_);
     492               0 :         last_exception_ = ep;
     493               0 :         last_error_ = {};
     494               0 :     }
     495                 : };
     496                 : 
     497                 : /** Create an io_result-aware runner for homogeneous when_any (range path).
     498                 : 
     499                 :     Only tries to win when the child returns !ec.
     500                 : */
     501                 : template<IoAwaitable Awaitable, typename StateType>
     502                 : when_any_io_runner<StateType>
     503 HIT          52 : make_when_any_io_homogeneous_runner(
     504                 :     Awaitable inner, StateType* state, std::size_t index)
     505                 : {
     506                 :     auto result = co_await std::move(inner);
     507                 : 
     508                 :     if(!result.ec)
     509                 :     {
     510                 :         if(state->core_.try_win(index))
     511                 :         {
     512                 :             using PayloadT = io_result_payload_t<
     513                 :                 awaitable_result_t<Awaitable>>;
     514                 :             if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     515                 :             {
     516                 :                 try
     517                 :                 {
     518                 :                     state->result_.emplace(
     519                 :                         extract_io_payload(std::move(result)));
     520                 :                 }
     521                 :                 catch(...)
     522                 :                 {
     523                 :                     state->core_.set_winner_exception(
     524                 :                         std::current_exception());
     525                 :                 }
     526                 :             }
     527                 :         }
     528                 :     }
     529                 :     else
     530                 :     {
     531                 :         state->record_error(result.ec);
     532                 :     }
     533             104 : }
     534                 : 
     535                 : /** Launches all io_result-aware homogeneous runners concurrently. */
     536                 : template<IoAwaitableRange Range>
     537                 : class when_any_io_homogeneous_launcher
     538                 : {
     539                 :     using Awaitable = std::ranges::range_value_t<Range>;
     540                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     541                 : 
     542                 :     Range* range_;
     543                 :     when_any_io_homogeneous_state<PayloadT>* state_;
     544                 : 
     545                 : public:
     546              15 :     when_any_io_homogeneous_launcher(
     547                 :         Range* range,
     548                 :         when_any_io_homogeneous_state<PayloadT>* state)
     549              15 :         : range_(range)
     550              15 :         , state_(state)
     551                 :     {
     552              15 :     }
     553                 : 
     554              15 :     bool await_ready() const noexcept
     555                 :     {
     556              15 :         return std::ranges::empty(*range_);
     557                 :     }
     558                 : 
     559              15 :     std::coroutine_handle<> await_suspend(
     560                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     561                 :     {
     562              15 :         state_->core_.continuation_.h = continuation;
     563              15 :         state_->core_.caller_env_ = caller_env;
     564                 : 
     565              15 :         if(caller_env->stop_token.stop_possible())
     566                 :         {
     567               4 :             state_->core_.parent_stop_callback_.emplace(
     568               2 :                 caller_env->stop_token,
     569               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     570                 : 
     571               2 :             if(caller_env->stop_token.stop_requested())
     572               1 :                 state_->core_.stop_source_.request_stop();
     573                 :         }
     574                 : 
     575              15 :         auto token = state_->core_.stop_source_.get_token();
     576                 : 
     577                 :         // Phase 1: Create all runners without dispatching.
     578              15 :         std::size_t index = 0;
     579              67 :         for(auto&& a : *range_)
     580                 :         {
     581              52 :             auto runner = make_when_any_io_homogeneous_runner(
     582              52 :                 std::move(a), state_, index);
     583                 : 
     584              52 :             auto h = runner.release();
     585              52 :             h.promise().state_ = state_;
     586              52 :             h.promise().index_ = index;
     587              52 :             h.promise().env_ = io_env{caller_env->executor, token,
     588              52 :                 caller_env->frame_allocator};
     589                 : 
     590              52 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     591              52 :             ++index;
     592                 :         }
     593                 : 
     594                 :         // Phase 2: Post all runners. Any may complete synchronously.
     595              15 :         auto* handles = state_->runner_handles_.get();
     596              15 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     597              67 :         for(std::size_t i = 0; i < count; ++i)
     598              52 :             caller_env->executor.post(handles[i]);
     599                 : 
     600              30 :         return std::noop_coroutine();
     601              67 :     }
     602                 : 
     603              15 :     void await_resume() const noexcept {}
     604                 : };
     605                 : 
     606                 : } // namespace detail
     607                 : 
     608                 : /** Race a range of io_result-returning awaitables (non-void payloads).
     609                 : 
     610                 :     Only a child returning !ec can win. Errors and exceptions do not
     611                 :     claim winner status. If all children fail, the last failure
     612                 :     is reported — either the last error_code at variant index 0,
     613                 :     or the last exception rethrown.
     614                 : 
     615                 :     @param awaitables Range of io_result-returning awaitables (must
     616                 :         not be empty).
     617                 : 
     618                 :     @return A task yielding variant<error_code, pair<size_t, PayloadT>>
     619                 :         where index 0 is failure and index 1 carries the winner's
     620                 :         index and payload.
     621                 : 
     622                 :     @throws std::invalid_argument if range is empty.
     623                 :     @throws Rethrows last exception when no winner and the last
     624                 :         failure was an exception.
     625                 : 
     626                 :     @par Example
     627                 :     @code
     628                 :     task<void> example()
     629                 :     {
     630                 :         std::vector<io_task<size_t>> reads;
     631                 :         for (auto& buf : buffers)
     632                 :             reads.push_back(stream.read_some(buf));
     633                 : 
     634                 :         auto result = co_await when_any(std::move(reads));
     635                 :         if (result.index() == 1)
     636                 :         {
     637                 :             auto [idx, n] = std::get<1>(result);
     638                 :         }
     639                 :     }
     640                 :     @endcode
     641                 : 
     642                 :     @see IoAwaitableRange, when_any
     643                 : */
     644                 : template<IoAwaitableRange R>
     645                 :     requires detail::is_io_result_v<
     646                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     647                 :     && (!std::is_same_v<
     648                 :             detail::io_result_payload_t<
     649                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     650                 :             std::tuple<>>)
     651              14 : [[nodiscard]] auto when_any(R&& awaitables)
     652                 :     -> task<std::variant<std::error_code,
     653                 :         std::pair<std::size_t,
     654                 :             detail::io_result_payload_t<
     655                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     656                 : {
     657                 :     using Awaitable = std::ranges::range_value_t<R>;
     658                 :     using PayloadT = detail::io_result_payload_t<
     659                 :         awaitable_result_t<Awaitable>>;
     660                 :     using result_type = std::variant<std::error_code,
     661                 :         std::pair<std::size_t, PayloadT>>;
     662                 :     using OwnedRange = std::remove_cvref_t<R>;
     663                 : 
     664                 :     auto count = std::ranges::size(awaitables);
     665                 :     if(count == 0)
     666                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     667                 : 
     668                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     669                 : 
     670                 :     detail::when_any_io_homogeneous_state<PayloadT> state(count);
     671                 : 
     672                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     673                 :         &owned_awaitables, &state);
     674                 : 
     675                 :     // Winner found
     676                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     677                 :     {
     678                 :         if(state.core_.winner_exception_)
     679                 :             std::rethrow_exception(state.core_.winner_exception_);
     680                 :         co_return result_type{std::in_place_index<1>,
     681                 :             std::pair{state.core_.winner_index_, std::move(*state.result_)}};
     682                 :     }
     683                 : 
     684                 :     // No winner — report last failure
     685                 :     if(state.last_exception_)
     686                 :         std::rethrow_exception(state.last_exception_);
     687                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     688              28 : }
     689                 : 
     690                 : /** Race a range of void io_result-returning awaitables.
     691                 : 
     692                 :     Only a child returning !ec can win. Returns the winner's index
     693                 :     at variant index 1, or error_code at index 0 on all-fail.
     694                 : 
     695                 :     @param awaitables Range of io_result<>-returning awaitables (must
     696                 :         not be empty).
     697                 : 
     698                 :     @return A task yielding variant<error_code, size_t> where index 0
     699                 :         is failure and index 1 carries the winner's index.
     700                 : 
     701                 :     @throws std::invalid_argument if range is empty.
     702                 :     @throws Rethrows first exception when no winner and at least one
     703                 :         child threw.
     704                 : 
     705                 :     @par Example
     706                 :     @code
     707                 :     task<void> example()
     708                 :     {
     709                 :         std::vector<io_task<>> jobs;
     710                 :         jobs.push_back(background_work_a());
     711                 :         jobs.push_back(background_work_b());
     712                 : 
     713                 :         auto result = co_await when_any(std::move(jobs));
     714                 :         if (result.index() == 1)
     715                 :         {
     716                 :             auto winner = std::get<1>(result);
     717                 :         }
     718                 :     }
     719                 :     @endcode
     720                 : 
     721                 :     @see IoAwaitableRange, when_any
     722                 : */
     723                 : template<IoAwaitableRange R>
     724                 :     requires detail::is_io_result_v<
     725                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     726                 :     && std::is_same_v<
     727                 :             detail::io_result_payload_t<
     728                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     729                 :             std::tuple<>>
     730               2 : [[nodiscard]] auto when_any(R&& awaitables)
     731                 :     -> task<std::variant<std::error_code, std::size_t>>
     732                 : {
     733                 :     using OwnedRange = std::remove_cvref_t<R>;
     734                 :     using result_type = std::variant<std::error_code, std::size_t>;
     735                 : 
     736                 :     auto count = std::ranges::size(awaitables);
     737                 :     if(count == 0)
     738                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     739                 : 
     740                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     741                 : 
     742                 :     detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
     743                 : 
     744                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     745                 :         &owned_awaitables, &state);
     746                 : 
     747                 :     // Winner found
     748                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     749                 :     {
     750                 :         if(state.core_.winner_exception_)
     751                 :             std::rethrow_exception(state.core_.winner_exception_);
     752                 :         co_return result_type{std::in_place_index<1>,
     753                 :             state.core_.winner_index_};
     754                 :     }
     755                 : 
     756                 :     // No winner — report last failure
     757                 :     if(state.last_exception_)
     758                 :         std::rethrow_exception(state.last_exception_);
     759                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     760               4 : }
     761                 : 
     762                 : /** Race io_result-returning awaitables, selecting the first success.
     763                 : 
     764                 :     Overload selected when all children return io_result<Ts...>.
     765                 :     Only a child returning !ec can win. Errors and exceptions do
     766                 :     not claim winner status.
     767                 : 
     768                 :     @return A task yielding variant<error_code, R1, ..., Rn> where
     769                 :         index 0 is the failure/no-winner case and index i+1
     770                 :         identifies the winning child.
     771                 : */
     772                 : template<IoAwaitable... As>
     773                 :     requires (sizeof...(As) > 0)
     774                 :           && detail::all_io_result_awaitables<As...>
     775              16 : [[nodiscard]] auto when_any(As... as)
     776                 :     -> task<std::variant<
     777                 :         std::error_code,
     778                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     779                 : {
     780                 :     using result_type = std::variant<
     781                 :         std::error_code,
     782                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     783                 : 
     784                 :     detail::when_any_io_state<
     785                 :         detail::io_result_payload_t<awaitable_result_t<As>>...> state;
     786                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     787                 : 
     788                 :     co_await detail::when_any_io_launcher<As...>(
     789                 :         &awaitable_tuple, &state);
     790                 : 
     791                 :     // Winner found: return their result
     792                 :     if(state.result_.has_value())
     793                 :         co_return std::move(*state.result_);
     794                 : 
     795                 :     // Winner claimed but payload construction failed
     796                 :     if(state.core_.winner_exception_)
     797                 :         std::rethrow_exception(state.core_.winner_exception_);
     798                 : 
     799                 :     // No winner — report last failure
     800                 :     if(state.last_exception_)
     801                 :         std::rethrow_exception(state.last_exception_);
     802                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     803              32 : }
     804                 : 
     805                 : } // namespace capy
     806                 : } // namespace boost
     807                 : 
     808                 : #endif
        

Generated by: LCOV version 2.3