TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 : #define BOOST_CAPY_WHEN_ALL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/io_result_combinators.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/ex/frame_allocator.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <array>
25 : #include <atomic>
26 : #include <exception>
27 : #include <memory>
28 : #include <optional>
29 : #include <ranges>
30 : #include <stdexcept>
31 : #include <stop_token>
32 : #include <tuple>
33 : #include <type_traits>
34 : #include <utility>
35 : #include <vector>
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : namespace detail {
41 :
42 : /** Holds the result of a single task within when_all.
43 : */
44 : template<typename T>
45 : struct result_holder
46 : {
47 : std::optional<T> value_;
48 :
49 HIT 81 : void set(T v)
50 : {
51 81 : value_ = std::move(v);
52 81 : }
53 :
54 69 : T get() &&
55 : {
56 69 : return std::move(*value_);
57 : }
58 : };
59 :
60 : /** Core shared state for when_all operations.
61 :
62 : Contains all members and methods common to both heterogeneous (variadic)
63 : and homogeneous (range) when_all implementations. State classes embed
64 : this via composition to avoid CRTP destructor ordering issues.
65 :
66 : @par Thread Safety
67 : Atomic operations protect exception capture and completion count.
68 : */
69 : struct when_all_core
70 : {
71 : std::atomic<std::size_t> remaining_count_;
72 :
73 : // Exception storage - first error wins, others discarded
74 : std::atomic<bool> has_exception_{false};
75 : std::exception_ptr first_exception_;
76 :
77 : std::stop_source stop_source_;
78 :
79 : // Bridges parent's stop token to our stop_source
80 : struct stop_callback_fn
81 : {
82 : std::stop_source* source_;
83 1 : void operator()() const { source_->request_stop(); }
84 : };
85 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
86 : std::optional<stop_callback_t> parent_stop_callback_;
87 :
88 : continuation continuation_;
89 : io_env const* caller_env_ = nullptr;
90 :
91 72 : explicit when_all_core(std::size_t count) noexcept
92 72 : : remaining_count_(count)
93 : {
94 72 : }
95 :
96 : /** Capture an exception (first one wins). */
97 19 : void capture_exception(std::exception_ptr ep)
98 : {
99 19 : bool expected = false;
100 19 : if(has_exception_.compare_exchange_strong(
101 : expected, true, std::memory_order_relaxed))
102 17 : first_exception_ = ep;
103 19 : }
104 : };
105 :
106 : /** Shared state for heterogeneous when_all (variadic overload).
107 :
108 : @tparam Ts The result types of the tasks.
109 : */
110 : template<typename... Ts>
111 : struct when_all_state
112 : {
113 : static constexpr std::size_t task_count = sizeof...(Ts);
114 :
115 : when_all_core core_;
116 : std::tuple<result_holder<Ts>...> results_;
117 : std::array<continuation, task_count> runner_handles_{};
118 :
119 : std::atomic<bool> has_error_{false};
120 : std::error_code first_error_;
121 :
122 50 : when_all_state()
123 50 : : core_(task_count)
124 : {
125 50 : }
126 :
127 : /** Record the first error (subsequent errors are discarded). */
128 43 : void record_error(std::error_code ec)
129 : {
130 43 : bool expected = false;
131 43 : if(has_error_.compare_exchange_strong(
132 : expected, true, std::memory_order_relaxed))
133 29 : first_error_ = ec;
134 43 : }
135 : };
136 :
137 : /** Shared state for homogeneous when_all (range overload).
138 :
139 : Stores extracted io_result payloads in a vector indexed by task
140 : position. Tracks the first error_code for error propagation.
141 :
142 : @tparam T The payload type extracted from io_result.
143 : */
144 : template<typename T>
145 : struct when_all_homogeneous_state
146 : {
147 : when_all_core core_;
148 : std::vector<std::optional<T>> results_;
149 : std::unique_ptr<continuation[]> runner_handles_;
150 :
151 : std::atomic<bool> has_error_{false};
152 : std::error_code first_error_;
153 :
154 11 : explicit when_all_homogeneous_state(std::size_t count)
155 11 : : core_(count)
156 22 : , results_(count)
157 11 : , runner_handles_(std::make_unique<continuation[]>(count))
158 : {
159 11 : }
160 :
161 16 : void set_result(std::size_t index, T value)
162 : {
163 16 : results_[index].emplace(std::move(value));
164 16 : }
165 :
166 : /** Record the first error (subsequent errors are discarded). */
167 7 : void record_error(std::error_code ec)
168 : {
169 7 : bool expected = false;
170 7 : if(has_error_.compare_exchange_strong(
171 : expected, true, std::memory_order_relaxed))
172 5 : first_error_ = ec;
173 7 : }
174 : };
175 :
176 : /** Specialization for void io_result children (no payload storage). */
177 : template<>
178 : struct when_all_homogeneous_state<std::tuple<>>
179 : {
180 : when_all_core core_;
181 : std::unique_ptr<continuation[]> runner_handles_;
182 :
183 : std::atomic<bool> has_error_{false};
184 : std::error_code first_error_;
185 :
186 3 : explicit when_all_homogeneous_state(std::size_t count)
187 3 : : core_(count)
188 3 : , runner_handles_(std::make_unique<continuation[]>(count))
189 : {
190 3 : }
191 :
192 : /** Record the first error (subsequent errors are discarded). */
193 1 : void record_error(std::error_code ec)
194 : {
195 1 : bool expected = false;
196 1 : if(has_error_.compare_exchange_strong(
197 : expected, true, std::memory_order_relaxed))
198 1 : first_error_ = ec;
199 1 : }
200 : };
201 :
202 : /** Wrapper coroutine that intercepts task completion for when_all.
203 :
204 : Parameterized on StateType to work with both heterogeneous (variadic)
205 : and homogeneous (range) state types. All state types expose their
206 : shared members through a `core_` member of type when_all_core.
207 :
208 : @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
209 : */
210 : template<typename StateType>
211 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
212 : {
213 : struct promise_type
214 : : frame_alloc_mixin
215 : {
216 : StateType* state_ = nullptr;
217 : std::size_t index_ = 0;
218 : io_env env_;
219 :
220 145 : when_all_runner get_return_object() noexcept
221 : {
222 : return when_all_runner(
223 145 : std::coroutine_handle<promise_type>::from_promise(*this));
224 : }
225 :
226 145 : std::suspend_always initial_suspend() noexcept
227 : {
228 145 : return {};
229 : }
230 :
231 145 : auto final_suspend() noexcept
232 : {
233 : struct awaiter
234 : {
235 : promise_type* p_;
236 145 : bool await_ready() const noexcept { return false; }
237 145 : auto await_suspend(std::coroutine_handle<> h) noexcept
238 : {
239 145 : auto& core = p_->state_->core_;
240 145 : auto* counter = &core.remaining_count_;
241 145 : auto* caller_env = core.caller_env_;
242 145 : auto& cont = core.continuation_;
243 :
244 145 : h.destroy();
245 :
246 145 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
247 145 : if(remaining == 1)
248 72 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
249 73 : return detail::symmetric_transfer(std::noop_coroutine());
250 : }
251 MIS 0 : void await_resume() const noexcept {}
252 : };
253 HIT 145 : return awaiter{this};
254 : }
255 :
256 126 : void return_void() noexcept {}
257 :
258 19 : void unhandled_exception() noexcept
259 : {
260 19 : state_->core_.capture_exception(std::current_exception());
261 19 : state_->core_.stop_source_.request_stop();
262 19 : }
263 :
264 : template<class Awaitable>
265 : struct transform_awaiter
266 : {
267 : std::decay_t<Awaitable> a_;
268 : promise_type* p_;
269 :
270 145 : bool await_ready() { return a_.await_ready(); }
271 145 : decltype(auto) await_resume() { return a_.await_resume(); }
272 :
273 : template<class Promise>
274 144 : auto await_suspend(std::coroutine_handle<Promise> h)
275 : {
276 : using R = decltype(a_.await_suspend(h, &p_->env_));
277 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
278 144 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
279 : else
280 : return a_.await_suspend(h, &p_->env_);
281 : }
282 : };
283 :
284 : template<class Awaitable>
285 145 : auto await_transform(Awaitable&& a)
286 : {
287 : using A = std::decay_t<Awaitable>;
288 : if constexpr (IoAwaitable<A>)
289 : {
290 : return transform_awaiter<Awaitable>{
291 290 : std::forward<Awaitable>(a), this};
292 : }
293 : else
294 : {
295 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
296 : }
297 145 : }
298 : };
299 :
300 : std::coroutine_handle<promise_type> h_;
301 :
302 145 : explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
303 145 : : h_(h)
304 : {
305 145 : }
306 :
307 : // Enable move for all clang versions - some versions need it
308 : when_all_runner(when_all_runner&& other) noexcept
309 : : h_(std::exchange(other.h_, nullptr))
310 : {
311 : }
312 :
313 : when_all_runner(when_all_runner const&) = delete;
314 : when_all_runner& operator=(when_all_runner const&) = delete;
315 : when_all_runner& operator=(when_all_runner&&) = delete;
316 :
317 145 : auto release() noexcept
318 : {
319 145 : return std::exchange(h_, nullptr);
320 : }
321 : };
322 :
323 : /** Create an io_result-aware runner for a single awaitable (range path).
324 :
325 : Checks the error code, records errors and requests stop on failure,
326 : or extracts the payload on success.
327 : */
328 : template<IoAwaitable Awaitable, typename StateType>
329 : when_all_runner<StateType>
330 32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
331 : {
332 : auto result = co_await std::move(inner);
333 :
334 : if(result.ec)
335 : {
336 : state->record_error(result.ec);
337 : state->core_.stop_source_.request_stop();
338 : }
339 : else
340 : {
341 : using PayloadT = io_result_payload_t<
342 : awaitable_result_t<Awaitable>>;
343 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
344 : {
345 : state->set_result(index,
346 : extract_io_payload(std::move(result)));
347 : }
348 : }
349 64 : }
350 :
351 : /** Create a runner for io_result children that requests stop on ec. */
352 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
353 : when_all_runner<when_all_state<Ts...>>
354 97 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
355 : {
356 : auto result = co_await std::move(inner);
357 : auto ec = result.ec;
358 : std::get<Index>(state->results_).set(std::move(result));
359 :
360 : if(ec)
361 : {
362 : state->record_error(ec);
363 : state->core_.stop_source_.request_stop();
364 : }
365 194 : }
366 :
367 : /** Launcher that uses io_result-aware runners. */
368 : template<IoAwaitable... Awaitables>
369 : class when_all_io_launcher
370 : {
371 : using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
372 :
373 : std::tuple<Awaitables...>* awaitables_;
374 : state_type* state_;
375 :
376 : public:
377 50 : when_all_io_launcher(
378 : std::tuple<Awaitables...>* awaitables,
379 : state_type* state)
380 50 : : awaitables_(awaitables)
381 50 : , state_(state)
382 : {
383 50 : }
384 :
385 50 : bool await_ready() const noexcept
386 : {
387 50 : return sizeof...(Awaitables) == 0;
388 : }
389 :
390 50 : std::coroutine_handle<> await_suspend(
391 : std::coroutine_handle<> continuation, io_env const* caller_env)
392 : {
393 50 : state_->core_.continuation_.h = continuation;
394 50 : state_->core_.caller_env_ = caller_env;
395 :
396 50 : if(caller_env->stop_token.stop_possible())
397 : {
398 2 : state_->core_.parent_stop_callback_.emplace(
399 1 : caller_env->stop_token,
400 1 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
401 :
402 1 : if(caller_env->stop_token.stop_requested())
403 MIS 0 : state_->core_.stop_source_.request_stop();
404 : }
405 :
406 HIT 50 : auto token = state_->core_.stop_source_.get_token();
407 46 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
408 50 : (..., launch_one<Is>(caller_env->executor, token));
409 50 : }(std::index_sequence_for<Awaitables...>{});
410 :
411 100 : return std::noop_coroutine();
412 50 : }
413 :
414 50 : void await_resume() const noexcept {}
415 :
416 : private:
417 : template<std::size_t I>
418 97 : void launch_one(executor_ref caller_ex, std::stop_token token)
419 : {
420 97 : auto runner = make_when_all_io_runner<I>(
421 97 : std::move(std::get<I>(*awaitables_)), state_);
422 :
423 97 : auto h = runner.release();
424 97 : h.promise().state_ = state_;
425 97 : h.promise().env_ = io_env{caller_ex, token,
426 97 : state_->core_.caller_env_->frame_allocator};
427 :
428 97 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
429 97 : state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
430 194 : }
431 : };
432 :
433 : /** Helper to extract a single result from state.
434 : This is a separate function to work around a GCC-11 ICE that occurs
435 : when using nested immediately-invoked lambdas with pack expansion.
436 : */
437 : template<std::size_t I, typename... Ts>
438 69 : auto extract_single_result(when_all_state<Ts...>& state)
439 : {
440 69 : return std::move(std::get<I>(state.results_)).get();
441 : }
442 :
443 : /** Extract all results from state as a tuple.
444 : */
445 : template<typename... Ts>
446 36 : auto extract_results(when_all_state<Ts...>& state)
447 : {
448 55 : return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
449 36 : return std::tuple(extract_single_result<Is>(state)...);
450 72 : }(std::index_sequence_for<Ts...>{});
451 : }
452 :
453 : /** Launches all homogeneous runners concurrently.
454 :
455 : Two-phase approach: create all runners first, then post all.
456 : This avoids lifetime issues if a task completes synchronously.
457 : */
458 : template<typename Range>
459 : class when_all_homogeneous_launcher
460 : {
461 : using Awaitable = std::ranges::range_value_t<Range>;
462 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
463 :
464 : Range* range_;
465 : when_all_homogeneous_state<PayloadT>* state_;
466 :
467 : public:
468 14 : when_all_homogeneous_launcher(
469 : Range* range,
470 : when_all_homogeneous_state<PayloadT>* state)
471 14 : : range_(range)
472 14 : , state_(state)
473 : {
474 14 : }
475 :
476 14 : bool await_ready() const noexcept
477 : {
478 14 : return std::ranges::empty(*range_);
479 : }
480 :
481 14 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
482 : {
483 14 : state_->core_.continuation_.h = continuation;
484 14 : state_->core_.caller_env_ = caller_env;
485 :
486 14 : if(caller_env->stop_token.stop_possible())
487 : {
488 2 : state_->core_.parent_stop_callback_.emplace(
489 1 : caller_env->stop_token,
490 1 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
491 :
492 1 : if(caller_env->stop_token.stop_requested())
493 MIS 0 : state_->core_.stop_source_.request_stop();
494 : }
495 :
496 HIT 14 : auto token = state_->core_.stop_source_.get_token();
497 :
498 : // Phase 1: Create all runners without dispatching.
499 14 : std::size_t index = 0;
500 46 : for(auto&& a : *range_)
501 : {
502 32 : auto runner = make_when_all_homogeneous_runner(
503 32 : std::move(a), state_, index);
504 :
505 32 : auto h = runner.release();
506 32 : h.promise().state_ = state_;
507 32 : h.promise().index_ = index;
508 32 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
509 :
510 32 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
511 32 : ++index;
512 : }
513 :
514 : // Phase 2: Post all runners. Any may complete synchronously.
515 : // After last post, state_ and this may be destroyed.
516 14 : auto* handles = state_->runner_handles_.get();
517 14 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
518 46 : for(std::size_t i = 0; i < count; ++i)
519 32 : caller_env->executor.post(handles[i]);
520 :
521 28 : return std::noop_coroutine();
522 46 : }
523 :
524 14 : void await_resume() const noexcept
525 : {
526 14 : }
527 : };
528 :
529 : } // namespace detail
530 :
531 : /** Execute a range of io_result-returning awaitables concurrently.
532 :
533 : Launches all awaitables simultaneously and waits for all to complete.
534 : On success, extracted payloads are collected in a vector preserving
535 : input order. The first error_code cancels siblings and is propagated
536 : in the outer io_result. Exceptions always beat error codes.
537 :
538 : @li All child awaitables run concurrently on the caller's executor
539 : @li Payloads are returned as a vector in input order
540 : @li First error_code wins and cancels siblings
541 : @li Exception always beats error_code
542 : @li Completes only after all children have finished
543 :
544 : @par Thread Safety
545 : The returned task must be awaited from a single execution context.
546 : Child awaitables execute concurrently but complete through the caller's
547 : executor.
548 :
549 : @param awaitables Range of io_result-returning awaitables to execute
550 : concurrently (must not be empty).
551 :
552 : @return A task yielding io_result<vector<PayloadT>> where PayloadT
553 : is the payload extracted from each child's io_result.
554 :
555 : @throws std::invalid_argument if range is empty (thrown before
556 : coroutine suspends).
557 : @throws Rethrows the first child exception after all children
558 : complete (exception beats error_code).
559 :
560 : @par Example
561 : @code
562 : task<void> example()
563 : {
564 : std::vector<io_task<size_t>> reads;
565 : for (auto& buf : buffers)
566 : reads.push_back(stream.read_some(buf));
567 :
568 : auto [ec, counts] = co_await when_all(std::move(reads));
569 : if (ec) { // handle error
570 : }
571 : }
572 : @endcode
573 :
574 : @see IoAwaitableRange, when_all
575 : */
576 : template<IoAwaitableRange R>
577 : requires detail::is_io_result_v<
578 : awaitable_result_t<std::ranges::range_value_t<R>>>
579 : && (!std::is_same_v<
580 : detail::io_result_payload_t<
581 : awaitable_result_t<std::ranges::range_value_t<R>>>,
582 : std::tuple<>>)
583 12 : [[nodiscard]] auto when_all(R&& awaitables)
584 : -> task<io_result<std::vector<
585 : detail::io_result_payload_t<
586 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
587 : {
588 : using Awaitable = std::ranges::range_value_t<R>;
589 : using PayloadT = detail::io_result_payload_t<
590 : awaitable_result_t<Awaitable>>;
591 : using OwnedRange = std::remove_cvref_t<R>;
592 :
593 : auto count = std::ranges::size(awaitables);
594 : if(count == 0)
595 : throw std::invalid_argument("when_all requires at least one awaitable");
596 :
597 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
598 :
599 : detail::when_all_homogeneous_state<PayloadT> state(count);
600 :
601 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
602 : &owned_awaitables, &state);
603 :
604 : if(state.core_.first_exception_)
605 : std::rethrow_exception(state.core_.first_exception_);
606 :
607 : if(state.has_error_.load(std::memory_order_relaxed))
608 : co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
609 :
610 : std::vector<PayloadT> results;
611 : results.reserve(count);
612 : for(auto& opt : state.results_)
613 : results.push_back(std::move(*opt));
614 :
615 : co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
616 24 : }
617 :
618 : /** Execute a range of void io_result-returning awaitables concurrently.
619 :
620 : Launches all awaitables simultaneously and waits for all to complete.
621 : Since all awaitables return io_result<>, no payload values are
622 : collected. The first error_code cancels siblings and is propagated.
623 : Exceptions always beat error codes.
624 :
625 : @param awaitables Range of io_result<>-returning awaitables to
626 : execute concurrently (must not be empty).
627 :
628 : @return A task yielding io_result<> whose ec is the first child
629 : error, or default-constructed on success.
630 :
631 : @throws std::invalid_argument if range is empty.
632 : @throws Rethrows the first child exception after all children
633 : complete (exception beats error_code).
634 :
635 : @par Example
636 : @code
637 : task<void> example()
638 : {
639 : std::vector<io_task<>> jobs;
640 : for (int i = 0; i < n; ++i)
641 : jobs.push_back(process(i));
642 :
643 : auto [ec] = co_await when_all(std::move(jobs));
644 : }
645 : @endcode
646 :
647 : @see IoAwaitableRange, when_all
648 : */
649 : template<IoAwaitableRange R>
650 : requires detail::is_io_result_v<
651 : awaitable_result_t<std::ranges::range_value_t<R>>>
652 : && std::is_same_v<
653 : detail::io_result_payload_t<
654 : awaitable_result_t<std::ranges::range_value_t<R>>>,
655 : std::tuple<>>
656 4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
657 : {
658 : using OwnedRange = std::remove_cvref_t<R>;
659 :
660 : auto count = std::ranges::size(awaitables);
661 : if(count == 0)
662 : throw std::invalid_argument("when_all requires at least one awaitable");
663 :
664 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
665 :
666 : detail::when_all_homogeneous_state<std::tuple<>> state(count);
667 :
668 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
669 : &owned_awaitables, &state);
670 :
671 : if(state.core_.first_exception_)
672 : std::rethrow_exception(state.core_.first_exception_);
673 :
674 : if(state.has_error_.load(std::memory_order_relaxed))
675 : co_return io_result<>{state.first_error_};
676 :
677 : co_return io_result<>{};
678 8 : }
679 :
680 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
681 :
682 : Overload selected when all children return io_result<Ts...>.
683 : The error_code is lifted out of each child into a single outer
684 : io_result. On success all values are returned; on failure the
685 : first error_code wins.
686 :
687 : @par Exception Safety
688 : Exception always beats error_code. If any child throws, the
689 : exception is rethrown regardless of error_code results.
690 :
691 : @param awaitables One or more awaitables each returning
692 : io_result<Ts...>.
693 :
694 : @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
695 : follows the payload flattening rules.
696 : */
697 : template<IoAwaitable... As>
698 : requires (sizeof...(As) > 0)
699 : && detail::all_io_result_awaitables<As...>
700 50 : [[nodiscard]] auto when_all(As... awaitables)
701 : -> task<io_result<
702 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
703 : {
704 : using result_type = io_result<
705 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
706 :
707 : detail::when_all_state<awaitable_result_t<As>...> state;
708 : std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
709 :
710 : co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
711 :
712 : // Exception always wins over error_code
713 : if(state.core_.first_exception_)
714 : std::rethrow_exception(state.core_.first_exception_);
715 :
716 : auto r = detail::build_when_all_io_result<result_type>(
717 : detail::extract_results(state));
718 : if(state.has_error_.load(std::memory_order_relaxed))
719 : r.ec = state.first_error_;
720 : co_return r;
721 100 : }
722 :
723 : } // namespace capy
724 : } // namespace boost
725 :
726 : #endif
|