1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/io_result_combinators.hpp>
14  
#include <boost/capy/detail/io_result_combinators.hpp>
15  
#include <boost/capy/continuation.hpp>
15  
#include <boost/capy/continuation.hpp>
16  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/executor.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <coroutine>
18  
#include <coroutine>
19  
#include <boost/capy/ex/frame_alloc_mixin.hpp>
19  
#include <boost/capy/ex/frame_alloc_mixin.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/frame_allocator.hpp>
21  
#include <boost/capy/ex/frame_allocator.hpp>
22  
#include <boost/capy/task.hpp>
22  
#include <boost/capy/task.hpp>
23  

23  

24  
#include <array>
24  
#include <array>
25  
#include <atomic>
25  
#include <atomic>
26  
#include <exception>
26  
#include <exception>
27  
#include <memory>
27  
#include <memory>
28  
#include <optional>
28  
#include <optional>
29  
#include <ranges>
29  
#include <ranges>
30  
#include <stdexcept>
30  
#include <stdexcept>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <tuple>
32  
#include <tuple>
33  
#include <type_traits>
33  
#include <type_traits>
34  
#include <utility>
34  
#include <utility>
35  
#include <vector>
35  
#include <vector>
36  

36  

37  
namespace boost {
37  
namespace boost {
38  
namespace capy {
38  
namespace capy {
39  

39  

40  
namespace detail {
40  
namespace detail {
41  

41  

42  
/** Holds the result of a single task within when_all.
42  
/** Holds the result of a single task within when_all.
43  
*/
43  
*/
44  
template<typename T>
44  
template<typename T>
45  
struct result_holder
45  
struct result_holder
46  
{
46  
{
47  
    std::optional<T> value_;
47  
    std::optional<T> value_;
48  

48  

49  
    void set(T v)
49  
    void set(T v)
50  
    {
50  
    {
51  
        value_ = std::move(v);
51  
        value_ = std::move(v);
52  
    }
52  
    }
53  

53  

54  
    T get() &&
54  
    T get() &&
55  
    {
55  
    {
56  
        return std::move(*value_);
56  
        return std::move(*value_);
57  
    }
57  
    }
58  
};
58  
};
59  

59  

60  
/** Core shared state for when_all operations.
60  
/** Core shared state for when_all operations.
61  

61  

62  
    Contains all members and methods common to both heterogeneous (variadic)
62  
    Contains all members and methods common to both heterogeneous (variadic)
63  
    and homogeneous (range) when_all implementations. State classes embed
63  
    and homogeneous (range) when_all implementations. State classes embed
64  
    this via composition to avoid CRTP destructor ordering issues.
64  
    this via composition to avoid CRTP destructor ordering issues.
65  

65  

66  
    @par Thread Safety
66  
    @par Thread Safety
67  
    Atomic operations protect exception capture and completion count.
67  
    Atomic operations protect exception capture and completion count.
68  
*/
68  
*/
69  
struct when_all_core
69  
struct when_all_core
70  
{
70  
{
71  
    std::atomic<std::size_t> remaining_count_;
71  
    std::atomic<std::size_t> remaining_count_;
72  

72  

73  
    // Exception storage - first error wins, others discarded
73  
    // Exception storage - first error wins, others discarded
74  
    std::atomic<bool> has_exception_{false};
74  
    std::atomic<bool> has_exception_{false};
75  
    std::exception_ptr first_exception_;
75  
    std::exception_ptr first_exception_;
76  

76  

77  
    std::stop_source stop_source_;
77  
    std::stop_source stop_source_;
78  

78  

79  
    // Bridges parent's stop token to our stop_source
79  
    // Bridges parent's stop token to our stop_source
80  
    struct stop_callback_fn
80  
    struct stop_callback_fn
81  
    {
81  
    {
82  
        std::stop_source* source_;
82  
        std::stop_source* source_;
83  
        void operator()() const { source_->request_stop(); }
83  
        void operator()() const { source_->request_stop(); }
84  
    };
84  
    };
85  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
85  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
86  
    std::optional<stop_callback_t> parent_stop_callback_;
86  
    std::optional<stop_callback_t> parent_stop_callback_;
87  

87  

88  
    continuation continuation_;
88  
    continuation continuation_;
89  
    io_env const* caller_env_ = nullptr;
89  
    io_env const* caller_env_ = nullptr;
90  

90  

91  
    explicit when_all_core(std::size_t count) noexcept
91  
    explicit when_all_core(std::size_t count) noexcept
92  
        : remaining_count_(count)
92  
        : remaining_count_(count)
93  
    {
93  
    {
94  
    }
94  
    }
95  

95  

96  
    /** Capture an exception (first one wins). */
96  
    /** Capture an exception (first one wins). */
97  
    void capture_exception(std::exception_ptr ep)
97  
    void capture_exception(std::exception_ptr ep)
98  
    {
98  
    {
99  
        bool expected = false;
99  
        bool expected = false;
100  
        if(has_exception_.compare_exchange_strong(
100  
        if(has_exception_.compare_exchange_strong(
101  
            expected, true, std::memory_order_relaxed))
101  
            expected, true, std::memory_order_relaxed))
102  
            first_exception_ = ep;
102  
            first_exception_ = ep;
103  
    }
103  
    }
104  
};
104  
};
105  

105  

106  
/** Shared state for heterogeneous when_all (variadic overload).
106  
/** Shared state for heterogeneous when_all (variadic overload).
107  

107  

108  
    @tparam Ts The result types of the tasks.
108  
    @tparam Ts The result types of the tasks.
109  
*/
109  
*/
110  
template<typename... Ts>
110  
template<typename... Ts>
111  
struct when_all_state
111  
struct when_all_state
112  
{
112  
{
113  
    static constexpr std::size_t task_count = sizeof...(Ts);
113  
    static constexpr std::size_t task_count = sizeof...(Ts);
114  

114  

115  
    when_all_core core_;
115  
    when_all_core core_;
116  
    std::tuple<result_holder<Ts>...> results_;
116  
    std::tuple<result_holder<Ts>...> results_;
117  
    std::array<continuation, task_count> runner_handles_{};
117  
    std::array<continuation, task_count> runner_handles_{};
118  

118  

119  
    std::atomic<bool> has_error_{false};
119  
    std::atomic<bool> has_error_{false};
120  
    std::error_code first_error_;
120  
    std::error_code first_error_;
121  

121  

122  
    when_all_state()
122  
    when_all_state()
123  
        : core_(task_count)
123  
        : core_(task_count)
124  
    {
124  
    {
125  
    }
125  
    }
126  

126  

127  
    /** Record the first error (subsequent errors are discarded). */
127  
    /** Record the first error (subsequent errors are discarded). */
128  
    void record_error(std::error_code ec)
128  
    void record_error(std::error_code ec)
129  
    {
129  
    {
130  
        bool expected = false;
130  
        bool expected = false;
131  
        if(has_error_.compare_exchange_strong(
131  
        if(has_error_.compare_exchange_strong(
132  
            expected, true, std::memory_order_relaxed))
132  
            expected, true, std::memory_order_relaxed))
133  
            first_error_ = ec;
133  
            first_error_ = ec;
134  
    }
134  
    }
135  
};
135  
};
136  

136  

137  
/** Shared state for homogeneous when_all (range overload).
137  
/** Shared state for homogeneous when_all (range overload).
138  

138  

139  
    Stores extracted io_result payloads in a vector indexed by task
139  
    Stores extracted io_result payloads in a vector indexed by task
140  
    position. Tracks the first error_code for error propagation.
140  
    position. Tracks the first error_code for error propagation.
141  

141  

142  
    @tparam T The payload type extracted from io_result.
142  
    @tparam T The payload type extracted from io_result.
143  
*/
143  
*/
144  
template<typename T>
144  
template<typename T>
145  
struct when_all_homogeneous_state
145  
struct when_all_homogeneous_state
146  
{
146  
{
147  
    when_all_core core_;
147  
    when_all_core core_;
148  
    std::vector<std::optional<T>> results_;
148  
    std::vector<std::optional<T>> results_;
149  
    std::unique_ptr<continuation[]> runner_handles_;
149  
    std::unique_ptr<continuation[]> runner_handles_;
150  

150  

151  
    std::atomic<bool> has_error_{false};
151  
    std::atomic<bool> has_error_{false};
152  
    std::error_code first_error_;
152  
    std::error_code first_error_;
153  

153  

154  
    explicit when_all_homogeneous_state(std::size_t count)
154  
    explicit when_all_homogeneous_state(std::size_t count)
155  
        : core_(count)
155  
        : core_(count)
156  
        , results_(count)
156  
        , results_(count)
157  
        , runner_handles_(std::make_unique<continuation[]>(count))
157  
        , runner_handles_(std::make_unique<continuation[]>(count))
158  
    {
158  
    {
159  
    }
159  
    }
160  

160  

161  
    void set_result(std::size_t index, T value)
161  
    void set_result(std::size_t index, T value)
162  
    {
162  
    {
163  
        results_[index].emplace(std::move(value));
163  
        results_[index].emplace(std::move(value));
164  
    }
164  
    }
165  

165  

166  
    /** Record the first error (subsequent errors are discarded). */
166  
    /** Record the first error (subsequent errors are discarded). */
167  
    void record_error(std::error_code ec)
167  
    void record_error(std::error_code ec)
168  
    {
168  
    {
169  
        bool expected = false;
169  
        bool expected = false;
170  
        if(has_error_.compare_exchange_strong(
170  
        if(has_error_.compare_exchange_strong(
171  
            expected, true, std::memory_order_relaxed))
171  
            expected, true, std::memory_order_relaxed))
172  
            first_error_ = ec;
172  
            first_error_ = ec;
173  
    }
173  
    }
174  
};
174  
};
175  

175  

176  
/** Specialization for void io_result children (no payload storage). */
176  
/** Specialization for void io_result children (no payload storage). */
177  
template<>
177  
template<>
178  
struct when_all_homogeneous_state<std::tuple<>>
178  
struct when_all_homogeneous_state<std::tuple<>>
179  
{
179  
{
180  
    when_all_core core_;
180  
    when_all_core core_;
181  
    std::unique_ptr<continuation[]> runner_handles_;
181  
    std::unique_ptr<continuation[]> runner_handles_;
182  

182  

183  
    std::atomic<bool> has_error_{false};
183  
    std::atomic<bool> has_error_{false};
184  
    std::error_code first_error_;
184  
    std::error_code first_error_;
185  

185  

186  
    explicit when_all_homogeneous_state(std::size_t count)
186  
    explicit when_all_homogeneous_state(std::size_t count)
187  
        : core_(count)
187  
        : core_(count)
188  
        , runner_handles_(std::make_unique<continuation[]>(count))
188  
        , runner_handles_(std::make_unique<continuation[]>(count))
189  
    {
189  
    {
190  
    }
190  
    }
191  

191  

192  
    /** Record the first error (subsequent errors are discarded). */
192  
    /** Record the first error (subsequent errors are discarded). */
193  
    void record_error(std::error_code ec)
193  
    void record_error(std::error_code ec)
194  
    {
194  
    {
195  
        bool expected = false;
195  
        bool expected = false;
196  
        if(has_error_.compare_exchange_strong(
196  
        if(has_error_.compare_exchange_strong(
197  
            expected, true, std::memory_order_relaxed))
197  
            expected, true, std::memory_order_relaxed))
198  
            first_error_ = ec;
198  
            first_error_ = ec;
199  
    }
199  
    }
200  
};
200  
};
201  

201  

202  
/** Wrapper coroutine that intercepts task completion for when_all.
202  
/** Wrapper coroutine that intercepts task completion for when_all.
203  

203  

204  
    Parameterized on StateType to work with both heterogeneous (variadic)
204  
    Parameterized on StateType to work with both heterogeneous (variadic)
205  
    and homogeneous (range) state types. All state types expose their
205  
    and homogeneous (range) state types. All state types expose their
206  
    shared members through a `core_` member of type when_all_core.
206  
    shared members through a `core_` member of type when_all_core.
207  

207  

208  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
208  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
209  
*/
209  
*/
210  
template<typename StateType>
210  
template<typename StateType>
211  
struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
211  
struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
212  
{
212  
{
213  
    struct promise_type
213  
    struct promise_type
214  
        : frame_alloc_mixin
214  
        : frame_alloc_mixin
215  
    {
215  
    {
216  
        StateType* state_ = nullptr;
216  
        StateType* state_ = nullptr;
217  
        std::size_t index_ = 0;
217  
        std::size_t index_ = 0;
218  
        io_env env_;
218  
        io_env env_;
219  

219  

220  
        when_all_runner get_return_object() noexcept
220  
        when_all_runner get_return_object() noexcept
221  
        {
221  
        {
222  
            return when_all_runner(
222  
            return when_all_runner(
223  
                std::coroutine_handle<promise_type>::from_promise(*this));
223  
                std::coroutine_handle<promise_type>::from_promise(*this));
224  
        }
224  
        }
225  

225  

226  
        std::suspend_always initial_suspend() noexcept
226  
        std::suspend_always initial_suspend() noexcept
227  
        {
227  
        {
228  
            return {};
228  
            return {};
229  
        }
229  
        }
230  

230  

231  
        auto final_suspend() noexcept
231  
        auto final_suspend() noexcept
232  
        {
232  
        {
233  
            struct awaiter
233  
            struct awaiter
234  
            {
234  
            {
235  
                promise_type* p_;
235  
                promise_type* p_;
236  
                bool await_ready() const noexcept { return false; }
236  
                bool await_ready() const noexcept { return false; }
237  
                auto await_suspend(std::coroutine_handle<> h) noexcept
237  
                auto await_suspend(std::coroutine_handle<> h) noexcept
238  
                {
238  
                {
239  
                    auto& core = p_->state_->core_;
239  
                    auto& core = p_->state_->core_;
240  
                    auto* counter = &core.remaining_count_;
240  
                    auto* counter = &core.remaining_count_;
241  
                    auto* caller_env = core.caller_env_;
241  
                    auto* caller_env = core.caller_env_;
242  
                    auto& cont = core.continuation_;
242  
                    auto& cont = core.continuation_;
243  

243  

244  
                    h.destroy();
244  
                    h.destroy();
245  

245  

246  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
246  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
247  
                    if(remaining == 1)
247  
                    if(remaining == 1)
248  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
248  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
249  
                    return detail::symmetric_transfer(std::noop_coroutine());
249  
                    return detail::symmetric_transfer(std::noop_coroutine());
250  
                }
250  
                }
251  
                void await_resume() const noexcept {}
251  
                void await_resume() const noexcept {}
252  
            };
252  
            };
253  
            return awaiter{this};
253  
            return awaiter{this};
254  
        }
254  
        }
255  

255  

256  
        void return_void() noexcept {}
256  
        void return_void() noexcept {}
257  

257  

258  
        void unhandled_exception() noexcept
258  
        void unhandled_exception() noexcept
259  
        {
259  
        {
260  
            state_->core_.capture_exception(std::current_exception());
260  
            state_->core_.capture_exception(std::current_exception());
261  
            state_->core_.stop_source_.request_stop();
261  
            state_->core_.stop_source_.request_stop();
262  
        }
262  
        }
263  

263  

264  
        template<class Awaitable>
264  
        template<class Awaitable>
265  
        struct transform_awaiter
265  
        struct transform_awaiter
266  
        {
266  
        {
267  
            std::decay_t<Awaitable> a_;
267  
            std::decay_t<Awaitable> a_;
268  
            promise_type* p_;
268  
            promise_type* p_;
269  

269  

270  
            bool await_ready() { return a_.await_ready(); }
270  
            bool await_ready() { return a_.await_ready(); }
271  
            decltype(auto) await_resume() { return a_.await_resume(); }
271  
            decltype(auto) await_resume() { return a_.await_resume(); }
272  

272  

273  
            template<class Promise>
273  
            template<class Promise>
274  
            auto await_suspend(std::coroutine_handle<Promise> h)
274  
            auto await_suspend(std::coroutine_handle<Promise> h)
275  
            {
275  
            {
276  
                using R = decltype(a_.await_suspend(h, &p_->env_));
276  
                using R = decltype(a_.await_suspend(h, &p_->env_));
277  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
277  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
278  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
278  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
279  
                else
279  
                else
280  
                    return a_.await_suspend(h, &p_->env_);
280  
                    return a_.await_suspend(h, &p_->env_);
281  
            }
281  
            }
282  
        };
282  
        };
283  

283  

284  
        template<class Awaitable>
284  
        template<class Awaitable>
285  
        auto await_transform(Awaitable&& a)
285  
        auto await_transform(Awaitable&& a)
286  
        {
286  
        {
287  
            using A = std::decay_t<Awaitable>;
287  
            using A = std::decay_t<Awaitable>;
288  
            if constexpr (IoAwaitable<A>)
288  
            if constexpr (IoAwaitable<A>)
289  
            {
289  
            {
290  
                return transform_awaiter<Awaitable>{
290  
                return transform_awaiter<Awaitable>{
291  
                    std::forward<Awaitable>(a), this};
291  
                    std::forward<Awaitable>(a), this};
292  
            }
292  
            }
293  
            else
293  
            else
294  
            {
294  
            {
295  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
295  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
296  
            }
296  
            }
297  
        }
297  
        }
298  
    };
298  
    };
299  

299  

300  
    std::coroutine_handle<promise_type> h_;
300  
    std::coroutine_handle<promise_type> h_;
301  

301  

302  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
302  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
303  
        : h_(h)
303  
        : h_(h)
304  
    {
304  
    {
305  
    }
305  
    }
306  

306  

307  
    // Enable move for all clang versions - some versions need it
307  
    // Enable move for all clang versions - some versions need it
308  
    when_all_runner(when_all_runner&& other) noexcept
308  
    when_all_runner(when_all_runner&& other) noexcept
309  
        : h_(std::exchange(other.h_, nullptr))
309  
        : h_(std::exchange(other.h_, nullptr))
310  
    {
310  
    {
311  
    }
311  
    }
312  

312  

313  
    when_all_runner(when_all_runner const&) = delete;
313  
    when_all_runner(when_all_runner const&) = delete;
314  
    when_all_runner& operator=(when_all_runner const&) = delete;
314  
    when_all_runner& operator=(when_all_runner const&) = delete;
315  
    when_all_runner& operator=(when_all_runner&&) = delete;
315  
    when_all_runner& operator=(when_all_runner&&) = delete;
316  

316  

317  
    auto release() noexcept
317  
    auto release() noexcept
318  
    {
318  
    {
319  
        return std::exchange(h_, nullptr);
319  
        return std::exchange(h_, nullptr);
320  
    }
320  
    }
321  
};
321  
};
322  

322  

323  
/** Create an io_result-aware runner for a single awaitable (range path).
323  
/** Create an io_result-aware runner for a single awaitable (range path).
324  

324  

325  
    Checks the error code, records errors and requests stop on failure,
325  
    Checks the error code, records errors and requests stop on failure,
326  
    or extracts the payload on success.
326  
    or extracts the payload on success.
327  
*/
327  
*/
328  
template<IoAwaitable Awaitable, typename StateType>
328  
template<IoAwaitable Awaitable, typename StateType>
329  
when_all_runner<StateType>
329  
when_all_runner<StateType>
330  
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
330  
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
331  
{
331  
{
332  
    auto result = co_await std::move(inner);
332  
    auto result = co_await std::move(inner);
333  

333  

334  
    if(result.ec)
334  
    if(result.ec)
335  
    {
335  
    {
336  
        state->record_error(result.ec);
336  
        state->record_error(result.ec);
337  
        state->core_.stop_source_.request_stop();
337  
        state->core_.stop_source_.request_stop();
338  
    }
338  
    }
339  
    else
339  
    else
340  
    {
340  
    {
341  
        using PayloadT = io_result_payload_t<
341  
        using PayloadT = io_result_payload_t<
342  
            awaitable_result_t<Awaitable>>;
342  
            awaitable_result_t<Awaitable>>;
343  
        if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
343  
        if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
344  
        {
344  
        {
345  
            state->set_result(index,
345  
            state->set_result(index,
346  
                extract_io_payload(std::move(result)));
346  
                extract_io_payload(std::move(result)));
347  
        }
347  
        }
348  
    }
348  
    }
349  
}
349  
}
350  

350  

351  
/** Create a runner for io_result children that requests stop on ec. */
351  
/** Create a runner for io_result children that requests stop on ec. */
352  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
352  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
353  
when_all_runner<when_all_state<Ts...>>
353  
when_all_runner<when_all_state<Ts...>>
354  
make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
354  
make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
355  
{
355  
{
356  
    auto result = co_await std::move(inner);
356  
    auto result = co_await std::move(inner);
357  
    auto ec = result.ec;
357  
    auto ec = result.ec;
358  
    std::get<Index>(state->results_).set(std::move(result));
358  
    std::get<Index>(state->results_).set(std::move(result));
359  

359  

360  
    if(ec)
360  
    if(ec)
361  
    {
361  
    {
362  
        state->record_error(ec);
362  
        state->record_error(ec);
363  
        state->core_.stop_source_.request_stop();
363  
        state->core_.stop_source_.request_stop();
364  
    }
364  
    }
365  
}
365  
}
366  

366  

367  
/** Launcher that uses io_result-aware runners. */
367  
/** Launcher that uses io_result-aware runners. */
368  
template<IoAwaitable... Awaitables>
368  
template<IoAwaitable... Awaitables>
369  
class when_all_io_launcher
369  
class when_all_io_launcher
370  
{
370  
{
371  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
371  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
372  

372  

373  
    std::tuple<Awaitables...>* awaitables_;
373  
    std::tuple<Awaitables...>* awaitables_;
374  
    state_type* state_;
374  
    state_type* state_;
375  

375  

376  
public:
376  
public:
377  
    when_all_io_launcher(
377  
    when_all_io_launcher(
378  
        std::tuple<Awaitables...>* awaitables,
378  
        std::tuple<Awaitables...>* awaitables,
379  
        state_type* state)
379  
        state_type* state)
380  
        : awaitables_(awaitables)
380  
        : awaitables_(awaitables)
381  
        , state_(state)
381  
        , state_(state)
382  
    {
382  
    {
383  
    }
383  
    }
384  

384  

385  
    bool await_ready() const noexcept
385  
    bool await_ready() const noexcept
386  
    {
386  
    {
387  
        return sizeof...(Awaitables) == 0;
387  
        return sizeof...(Awaitables) == 0;
388  
    }
388  
    }
389  

389  

390  
    std::coroutine_handle<> await_suspend(
390  
    std::coroutine_handle<> await_suspend(
391  
        std::coroutine_handle<> continuation, io_env const* caller_env)
391  
        std::coroutine_handle<> continuation, io_env const* caller_env)
392  
    {
392  
    {
393  
        state_->core_.continuation_.h = continuation;
393  
        state_->core_.continuation_.h = continuation;
394  
        state_->core_.caller_env_ = caller_env;
394  
        state_->core_.caller_env_ = caller_env;
395  

395  

396  
        if(caller_env->stop_token.stop_possible())
396  
        if(caller_env->stop_token.stop_possible())
397  
        {
397  
        {
398  
            state_->core_.parent_stop_callback_.emplace(
398  
            state_->core_.parent_stop_callback_.emplace(
399  
                caller_env->stop_token,
399  
                caller_env->stop_token,
400  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
400  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
401  

401  

402  
            if(caller_env->stop_token.stop_requested())
402  
            if(caller_env->stop_token.stop_requested())
403  
                state_->core_.stop_source_.request_stop();
403  
                state_->core_.stop_source_.request_stop();
404  
        }
404  
        }
405  

405  

406  
        auto token = state_->core_.stop_source_.get_token();
406  
        auto token = state_->core_.stop_source_.get_token();
407  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
407  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
408  
            (..., launch_one<Is>(caller_env->executor, token));
408  
            (..., launch_one<Is>(caller_env->executor, token));
409  
        }(std::index_sequence_for<Awaitables...>{});
409  
        }(std::index_sequence_for<Awaitables...>{});
410  

410  

411  
        return std::noop_coroutine();
411  
        return std::noop_coroutine();
412  
    }
412  
    }
413  

413  

414  
    void await_resume() const noexcept {}
414  
    void await_resume() const noexcept {}
415  

415  

416  
private:
416  
private:
417  
    template<std::size_t I>
417  
    template<std::size_t I>
418  
    void launch_one(executor_ref caller_ex, std::stop_token token)
418  
    void launch_one(executor_ref caller_ex, std::stop_token token)
419  
    {
419  
    {
420  
        auto runner = make_when_all_io_runner<I>(
420  
        auto runner = make_when_all_io_runner<I>(
421  
            std::move(std::get<I>(*awaitables_)), state_);
421  
            std::move(std::get<I>(*awaitables_)), state_);
422  

422  

423  
        auto h = runner.release();
423  
        auto h = runner.release();
424  
        h.promise().state_ = state_;
424  
        h.promise().state_ = state_;
425  
        h.promise().env_ = io_env{caller_ex, token,
425  
        h.promise().env_ = io_env{caller_ex, token,
426  
            state_->core_.caller_env_->frame_allocator};
426  
            state_->core_.caller_env_->frame_allocator};
427  

427  

428  
        state_->runner_handles_[I].h = std::coroutine_handle<>{h};
428  
        state_->runner_handles_[I].h = std::coroutine_handle<>{h};
429  
        state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
429  
        state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
430  
    }
430  
    }
431  
};
431  
};
432  

432  

433  
/** Helper to extract a single result from state.
433  
/** Helper to extract a single result from state.
434  
    This is a separate function to work around a GCC-11 ICE that occurs
434  
    This is a separate function to work around a GCC-11 ICE that occurs
435  
    when using nested immediately-invoked lambdas with pack expansion.
435  
    when using nested immediately-invoked lambdas with pack expansion.
436  
*/
436  
*/
437  
template<std::size_t I, typename... Ts>
437  
template<std::size_t I, typename... Ts>
438  
auto extract_single_result(when_all_state<Ts...>& state)
438  
auto extract_single_result(when_all_state<Ts...>& state)
439  
{
439  
{
440  
    return std::move(std::get<I>(state.results_)).get();
440  
    return std::move(std::get<I>(state.results_)).get();
441  
}
441  
}
442  

442  

443  
/** Extract all results from state as a tuple.
443  
/** Extract all results from state as a tuple.
444  
*/
444  
*/
445  
template<typename... Ts>
445  
template<typename... Ts>
446  
auto extract_results(when_all_state<Ts...>& state)
446  
auto extract_results(when_all_state<Ts...>& state)
447  
{
447  
{
448  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
448  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
449  
        return std::tuple(extract_single_result<Is>(state)...);
449  
        return std::tuple(extract_single_result<Is>(state)...);
450  
    }(std::index_sequence_for<Ts...>{});
450  
    }(std::index_sequence_for<Ts...>{});
451  
}
451  
}
452  

452  

453  
/** Launches all homogeneous runners concurrently.
453  
/** Launches all homogeneous runners concurrently.
454  

454  

455  
    Two-phase approach: create all runners first, then post all.
455  
    Two-phase approach: create all runners first, then post all.
456  
    This avoids lifetime issues if a task completes synchronously.
456  
    This avoids lifetime issues if a task completes synchronously.
457  
*/
457  
*/
458  
template<typename Range>
458  
template<typename Range>
459  
class when_all_homogeneous_launcher
459  
class when_all_homogeneous_launcher
460  
{
460  
{
461  
    using Awaitable = std::ranges::range_value_t<Range>;
461  
    using Awaitable = std::ranges::range_value_t<Range>;
462  
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
462  
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
463  

463  

464  
    Range* range_;
464  
    Range* range_;
465  
    when_all_homogeneous_state<PayloadT>* state_;
465  
    when_all_homogeneous_state<PayloadT>* state_;
466  

466  

467  
public:
467  
public:
468  
    when_all_homogeneous_launcher(
468  
    when_all_homogeneous_launcher(
469  
        Range* range,
469  
        Range* range,
470  
        when_all_homogeneous_state<PayloadT>* state)
470  
        when_all_homogeneous_state<PayloadT>* state)
471  
        : range_(range)
471  
        : range_(range)
472  
        , state_(state)
472  
        , state_(state)
473  
    {
473  
    {
474  
    }
474  
    }
475  

475  

476  
    bool await_ready() const noexcept
476  
    bool await_ready() const noexcept
477  
    {
477  
    {
478  
        return std::ranges::empty(*range_);
478  
        return std::ranges::empty(*range_);
479  
    }
479  
    }
480  

480  

481  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
481  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
482  
    {
482  
    {
483  
        state_->core_.continuation_.h = continuation;
483  
        state_->core_.continuation_.h = continuation;
484  
        state_->core_.caller_env_ = caller_env;
484  
        state_->core_.caller_env_ = caller_env;
485  

485  

486  
        if(caller_env->stop_token.stop_possible())
486  
        if(caller_env->stop_token.stop_possible())
487  
        {
487  
        {
488  
            state_->core_.parent_stop_callback_.emplace(
488  
            state_->core_.parent_stop_callback_.emplace(
489  
                caller_env->stop_token,
489  
                caller_env->stop_token,
490  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
490  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
491  

491  

492  
            if(caller_env->stop_token.stop_requested())
492  
            if(caller_env->stop_token.stop_requested())
493  
                state_->core_.stop_source_.request_stop();
493  
                state_->core_.stop_source_.request_stop();
494  
        }
494  
        }
495  

495  

496  
        auto token = state_->core_.stop_source_.get_token();
496  
        auto token = state_->core_.stop_source_.get_token();
497  

497  

498  
        // Phase 1: Create all runners without dispatching.
498  
        // Phase 1: Create all runners without dispatching.
499  
        std::size_t index = 0;
499  
        std::size_t index = 0;
500  
        for(auto&& a : *range_)
500  
        for(auto&& a : *range_)
501  
        {
501  
        {
502  
            auto runner = make_when_all_homogeneous_runner(
502  
            auto runner = make_when_all_homogeneous_runner(
503  
                std::move(a), state_, index);
503  
                std::move(a), state_, index);
504  

504  

505  
            auto h = runner.release();
505  
            auto h = runner.release();
506  
            h.promise().state_ = state_;
506  
            h.promise().state_ = state_;
507  
            h.promise().index_ = index;
507  
            h.promise().index_ = index;
508  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
508  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
509  

509  

510  
            state_->runner_handles_[index].h = std::coroutine_handle<>{h};
510  
            state_->runner_handles_[index].h = std::coroutine_handle<>{h};
511  
            ++index;
511  
            ++index;
512  
        }
512  
        }
513  

513  

514  
        // Phase 2: Post all runners. Any may complete synchronously.
514  
        // Phase 2: Post all runners. Any may complete synchronously.
515  
        // After last post, state_ and this may be destroyed.
515  
        // After last post, state_ and this may be destroyed.
516  
        auto* handles = state_->runner_handles_.get();
516  
        auto* handles = state_->runner_handles_.get();
517  
        std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
517  
        std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
518  
        for(std::size_t i = 0; i < count; ++i)
518  
        for(std::size_t i = 0; i < count; ++i)
519  
            caller_env->executor.post(handles[i]);
519  
            caller_env->executor.post(handles[i]);
520  

520  

521  
        return std::noop_coroutine();
521  
        return std::noop_coroutine();
522  
    }
522  
    }
523  

523  

524  
    void await_resume() const noexcept
524  
    void await_resume() const noexcept
525  
    {
525  
    {
526  
    }
526  
    }
527  
};
527  
};
528  

528  

529  
} // namespace detail
529  
} // namespace detail
530  

530  

531  
/** Execute a range of io_result-returning awaitables concurrently.
531  
/** Execute a range of io_result-returning awaitables concurrently.
532  

532  

533  
    Launches all awaitables simultaneously and waits for all to complete.
533  
    Launches all awaitables simultaneously and waits for all to complete.
534  
    On success, extracted payloads are collected in a vector preserving
534  
    On success, extracted payloads are collected in a vector preserving
535  
    input order. The first error_code cancels siblings and is propagated
535  
    input order. The first error_code cancels siblings and is propagated
536  
    in the outer io_result. Exceptions always beat error codes.
536  
    in the outer io_result. Exceptions always beat error codes.
537  

537  

538  
    @li All child awaitables run concurrently on the caller's executor
538  
    @li All child awaitables run concurrently on the caller's executor
539  
    @li Payloads are returned as a vector in input order
539  
    @li Payloads are returned as a vector in input order
540  
    @li First error_code wins and cancels siblings
540  
    @li First error_code wins and cancels siblings
541  
    @li Exception always beats error_code
541  
    @li Exception always beats error_code
542  
    @li Completes only after all children have finished
542  
    @li Completes only after all children have finished
543  

543  

544  
    @par Thread Safety
544  
    @par Thread Safety
545  
    The returned task must be awaited from a single execution context.
545  
    The returned task must be awaited from a single execution context.
546  
    Child awaitables execute concurrently but complete through the caller's
546  
    Child awaitables execute concurrently but complete through the caller's
547  
    executor.
547  
    executor.
548  

548  

549  
    @param awaitables Range of io_result-returning awaitables to execute
549  
    @param awaitables Range of io_result-returning awaitables to execute
550  
        concurrently (must not be empty).
550  
        concurrently (must not be empty).
551  

551  

552  
    @return A task yielding io_result<vector<PayloadT>> where PayloadT
552  
    @return A task yielding io_result<vector<PayloadT>> where PayloadT
553  
        is the payload extracted from each child's io_result.
553  
        is the payload extracted from each child's io_result.
554  

554  

555  
    @throws std::invalid_argument if range is empty (thrown before
555  
    @throws std::invalid_argument if range is empty (thrown before
556  
        coroutine suspends).
556  
        coroutine suspends).
557  
    @throws Rethrows the first child exception after all children
557  
    @throws Rethrows the first child exception after all children
558  
        complete (exception beats error_code).
558  
        complete (exception beats error_code).
559  

559  

560  
    @par Example
560  
    @par Example
561  
    @code
561  
    @code
562  
    task<void> example()
562  
    task<void> example()
563  
    {
563  
    {
564  
        std::vector<io_task<size_t>> reads;
564  
        std::vector<io_task<size_t>> reads;
565  
        for (auto& buf : buffers)
565  
        for (auto& buf : buffers)
566  
            reads.push_back(stream.read_some(buf));
566  
            reads.push_back(stream.read_some(buf));
567  

567  

568  
        auto [ec, counts] = co_await when_all(std::move(reads));
568  
        auto [ec, counts] = co_await when_all(std::move(reads));
569  
        if (ec) { // handle error
569  
        if (ec) { // handle error
570  
        }
570  
        }
571  
    }
571  
    }
572  
    @endcode
572  
    @endcode
573  

573  

574  
    @see IoAwaitableRange, when_all
574  
    @see IoAwaitableRange, when_all
575  
*/
575  
*/
576  
template<IoAwaitableRange R>
576  
template<IoAwaitableRange R>
577  
    requires detail::is_io_result_v<
577  
    requires detail::is_io_result_v<
578  
        awaitable_result_t<std::ranges::range_value_t<R>>>
578  
        awaitable_result_t<std::ranges::range_value_t<R>>>
579  
    && (!std::is_same_v<
579  
    && (!std::is_same_v<
580  
            detail::io_result_payload_t<
580  
            detail::io_result_payload_t<
581  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
581  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
582  
            std::tuple<>>)
582  
            std::tuple<>>)
583  
[[nodiscard]] auto when_all(R&& awaitables)
583  
[[nodiscard]] auto when_all(R&& awaitables)
584  
    -> task<io_result<std::vector<
584  
    -> task<io_result<std::vector<
585  
        detail::io_result_payload_t<
585  
        detail::io_result_payload_t<
586  
            awaitable_result_t<std::ranges::range_value_t<R>>>>>>
586  
            awaitable_result_t<std::ranges::range_value_t<R>>>>>>
587  
{
587  
{
588  
    using Awaitable = std::ranges::range_value_t<R>;
588  
    using Awaitable = std::ranges::range_value_t<R>;
589  
    using PayloadT = detail::io_result_payload_t<
589  
    using PayloadT = detail::io_result_payload_t<
590  
        awaitable_result_t<Awaitable>>;
590  
        awaitable_result_t<Awaitable>>;
591  
    using OwnedRange = std::remove_cvref_t<R>;
591  
    using OwnedRange = std::remove_cvref_t<R>;
592  

592  

593  
    auto count = std::ranges::size(awaitables);
593  
    auto count = std::ranges::size(awaitables);
594  
    if(count == 0)
594  
    if(count == 0)
595  
        throw std::invalid_argument("when_all requires at least one awaitable");
595  
        throw std::invalid_argument("when_all requires at least one awaitable");
596  

596  

597  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
597  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
598  

598  

599  
    detail::when_all_homogeneous_state<PayloadT> state(count);
599  
    detail::when_all_homogeneous_state<PayloadT> state(count);
600  

600  

601  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
601  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
602  
        &owned_awaitables, &state);
602  
        &owned_awaitables, &state);
603  

603  

604  
    if(state.core_.first_exception_)
604  
    if(state.core_.first_exception_)
605  
        std::rethrow_exception(state.core_.first_exception_);
605  
        std::rethrow_exception(state.core_.first_exception_);
606  

606  

607  
    if(state.has_error_.load(std::memory_order_relaxed))
607  
    if(state.has_error_.load(std::memory_order_relaxed))
608  
        co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
608  
        co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
609  

609  

610  
    std::vector<PayloadT> results;
610  
    std::vector<PayloadT> results;
611  
    results.reserve(count);
611  
    results.reserve(count);
612  
    for(auto& opt : state.results_)
612  
    for(auto& opt : state.results_)
613  
        results.push_back(std::move(*opt));
613  
        results.push_back(std::move(*opt));
614  

614  

615  
    co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
615  
    co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
616  
}
616  
}
617  

617  

618  
/** Execute a range of void io_result-returning awaitables concurrently.
618  
/** Execute a range of void io_result-returning awaitables concurrently.
619  

619  

620  
    Launches all awaitables simultaneously and waits for all to complete.
620  
    Launches all awaitables simultaneously and waits for all to complete.
621  
    Since all awaitables return io_result<>, no payload values are
621  
    Since all awaitables return io_result<>, no payload values are
622  
    collected. The first error_code cancels siblings and is propagated.
622  
    collected. The first error_code cancels siblings and is propagated.
623  
    Exceptions always beat error codes.
623  
    Exceptions always beat error codes.
624  

624  

625  
    @param awaitables Range of io_result<>-returning awaitables to
625  
    @param awaitables Range of io_result<>-returning awaitables to
626  
        execute concurrently (must not be empty).
626  
        execute concurrently (must not be empty).
627  

627  

628  
    @return A task yielding io_result<> whose ec is the first child
628  
    @return A task yielding io_result<> whose ec is the first child
629  
        error, or default-constructed on success.
629  
        error, or default-constructed on success.
630  

630  

631  
    @throws std::invalid_argument if range is empty.
631  
    @throws std::invalid_argument if range is empty.
632  
    @throws Rethrows the first child exception after all children
632  
    @throws Rethrows the first child exception after all children
633  
        complete (exception beats error_code).
633  
        complete (exception beats error_code).
634  

634  

635  
    @par Example
635  
    @par Example
636  
    @code
636  
    @code
637  
    task<void> example()
637  
    task<void> example()
638  
    {
638  
    {
639  
        std::vector<io_task<>> jobs;
639  
        std::vector<io_task<>> jobs;
640  
        for (int i = 0; i < n; ++i)
640  
        for (int i = 0; i < n; ++i)
641  
            jobs.push_back(process(i));
641  
            jobs.push_back(process(i));
642  

642  

643  
        auto [ec] = co_await when_all(std::move(jobs));
643  
        auto [ec] = co_await when_all(std::move(jobs));
644  
    }
644  
    }
645  
    @endcode
645  
    @endcode
646  

646  

647  
    @see IoAwaitableRange, when_all
647  
    @see IoAwaitableRange, when_all
648  
*/
648  
*/
649  
template<IoAwaitableRange R>
649  
template<IoAwaitableRange R>
650  
    requires detail::is_io_result_v<
650  
    requires detail::is_io_result_v<
651  
        awaitable_result_t<std::ranges::range_value_t<R>>>
651  
        awaitable_result_t<std::ranges::range_value_t<R>>>
652  
    && std::is_same_v<
652  
    && std::is_same_v<
653  
            detail::io_result_payload_t<
653  
            detail::io_result_payload_t<
654  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
654  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
655  
            std::tuple<>>
655  
            std::tuple<>>
656  
[[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
656  
[[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
657  
{
657  
{
658  
    using OwnedRange = std::remove_cvref_t<R>;
658  
    using OwnedRange = std::remove_cvref_t<R>;
659  

659  

660  
    auto count = std::ranges::size(awaitables);
660  
    auto count = std::ranges::size(awaitables);
661  
    if(count == 0)
661  
    if(count == 0)
662  
        throw std::invalid_argument("when_all requires at least one awaitable");
662  
        throw std::invalid_argument("when_all requires at least one awaitable");
663  

663  

664  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
664  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
665  

665  

666  
    detail::when_all_homogeneous_state<std::tuple<>> state(count);
666  
    detail::when_all_homogeneous_state<std::tuple<>> state(count);
667  

667  

668  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
668  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
669  
        &owned_awaitables, &state);
669  
        &owned_awaitables, &state);
670  

670  

671  
    if(state.core_.first_exception_)
671  
    if(state.core_.first_exception_)
672  
        std::rethrow_exception(state.core_.first_exception_);
672  
        std::rethrow_exception(state.core_.first_exception_);
673  

673  

674  
    if(state.has_error_.load(std::memory_order_relaxed))
674  
    if(state.has_error_.load(std::memory_order_relaxed))
675  
        co_return io_result<>{state.first_error_};
675  
        co_return io_result<>{state.first_error_};
676  

676  

677  
    co_return io_result<>{};
677  
    co_return io_result<>{};
678  
}
678  
}
679  

679  

680  
/** Execute io_result-returning awaitables concurrently, inspecting error codes.
680  
/** Execute io_result-returning awaitables concurrently, inspecting error codes.
681  

681  

682  
    Overload selected when all children return io_result<Ts...>.
682  
    Overload selected when all children return io_result<Ts...>.
683  
    The error_code is lifted out of each child into a single outer
683  
    The error_code is lifted out of each child into a single outer
684  
    io_result. On success all values are returned; on failure the
684  
    io_result. On success all values are returned; on failure the
685  
    first error_code wins.
685  
    first error_code wins.
686  

686  

687  
    @par Exception Safety
687  
    @par Exception Safety
688  
    Exception always beats error_code. If any child throws, the
688  
    Exception always beats error_code. If any child throws, the
689  
    exception is rethrown regardless of error_code results.
689  
    exception is rethrown regardless of error_code results.
690  

690  

691  
    @param awaitables One or more awaitables each returning
691  
    @param awaitables One or more awaitables each returning
692  
        io_result<Ts...>.
692  
        io_result<Ts...>.
693  

693  

694  
    @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
694  
    @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
695  
        follows the payload flattening rules.
695  
        follows the payload flattening rules.
696  
*/
696  
*/
697  
template<IoAwaitable... As>
697  
template<IoAwaitable... As>
698  
    requires (sizeof...(As) > 0)
698  
    requires (sizeof...(As) > 0)
699  
          && detail::all_io_result_awaitables<As...>
699  
          && detail::all_io_result_awaitables<As...>
700  
[[nodiscard]] auto when_all(As... awaitables)
700  
[[nodiscard]] auto when_all(As... awaitables)
701  
    -> task<io_result<
701  
    -> task<io_result<
702  
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
702  
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
703  
{
703  
{
704  
    using result_type = io_result<
704  
    using result_type = io_result<
705  
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
705  
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
706  

706  

707  
    detail::when_all_state<awaitable_result_t<As>...> state;
707  
    detail::when_all_state<awaitable_result_t<As>...> state;
708  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
708  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
709  

709  

710  
    co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
710  
    co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
711  

711  

712  
    // Exception always wins over error_code
712  
    // Exception always wins over error_code
713  
    if(state.core_.first_exception_)
713  
    if(state.core_.first_exception_)
714  
        std::rethrow_exception(state.core_.first_exception_);
714  
        std::rethrow_exception(state.core_.first_exception_);
715  

715  

716  
    auto r = detail::build_when_all_io_result<result_type>(
716  
    auto r = detail::build_when_all_io_result<result_type>(
717  
        detail::extract_results(state));
717  
        detail::extract_results(state));
718  
    if(state.has_error_.load(std::memory_order_relaxed))
718  
    if(state.has_error_.load(std::memory_order_relaxed))
719  
        r.ec = state.first_error_;
719  
        r.ec = state.first_error_;
720  
    co_return r;
720  
    co_return r;
721  
}
721  
}
722  

722  

723  
} // namespace capy
723  
} // namespace capy
724  
} // namespace boost
724  
} // namespace boost
725  

725  

726  
#endif
726  
#endif