TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
12 : #define BOOST_CAPY_WHEN_ANY_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/detail/io_result_combinators.hpp>
16 : #include <boost/capy/continuation.hpp>
17 : #include <boost/capy/concept/executor.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
22 : #include <boost/capy/ex/frame_allocator.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <array>
27 : #include <atomic>
28 : #include <exception>
29 : #include <memory>
30 : #include <mutex>
31 : #include <optional>
32 : #include <ranges>
33 : #include <stdexcept>
34 : #include <stop_token>
35 : #include <tuple>
36 : #include <type_traits>
37 : #include <utility>
38 : #include <variant>
39 : #include <vector>
40 :
41 : /*
42 : when_any - Race multiple io_result tasks, select first success
43 : =============================================================
44 :
45 : OVERVIEW:
46 : ---------
47 : when_any launches N io_result-returning tasks concurrently. A task
48 : wins by returning !ec; errors and exceptions do not win. Once a
49 : winner is found, stop is requested for siblings and the winner's
50 : payload is returned. If no winner exists (all fail), the first
51 : error_code is returned or the last exception is rethrown.
52 :
53 : ARCHITECTURE:
54 : -------------
55 : The design mirrors when_all but with inverted completion semantics:
56 :
57 : when_all: complete when remaining_count reaches 0 (all done)
58 : when_any: complete when has_winner becomes true (first done)
59 : BUT still wait for remaining_count to reach 0 for cleanup
60 :
61 : Key components:
62 : - when_any_core: Shared state tracking winner and completion
63 : - when_any_io_runner: Wrapper coroutine for each child task
64 : - when_any_io_launcher/when_any_io_homogeneous_launcher:
65 : Awaitables that start all runners concurrently
66 :
67 : CRITICAL INVARIANTS:
68 : --------------------
69 : 1. Only a task returning !ec can become the winner (via atomic CAS)
70 : 2. All tasks must complete before parent resumes (cleanup safety)
71 : 3. Stop is requested immediately when winner is determined
72 : 4. Exceptions and errors do not claim winner status
73 :
74 : POSITIONAL VARIANT:
75 : -------------------
76 : The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
77 : Index 0 is error_code (failure/no-winner). Index 1..N identifies the
78 : winning child and carries its payload.
79 :
80 : RANGE OVERLOAD:
81 : ---------------
82 : The range overload returns variant<error_code, pair<size_t, T>> for
83 : non-void children or variant<error_code, size_t> for void children.
84 :
85 : MEMORY MODEL:
86 : -------------
87 : Synchronization chain from winner's write to parent's read:
88 :
89 : 1. Winner thread writes result_ (non-atomic)
90 : 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
91 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
92 : -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
93 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
94 : 5. Parent coroutine resumes and reads result_
95 :
96 : Synchronization analysis:
97 : - All fetch_sub operations on remaining_count_ form a release sequence
98 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
99 : in the modification order of remaining_count_
100 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
101 : modification order, establishing happens-before from winner's writes
102 : - Executor dispatch() is expected to provide queue-based synchronization
103 : (release-on-post, acquire-on-execute) completing the chain to parent
104 : - Even inline executors work (same thread = sequenced-before)
105 :
106 : EXCEPTION SEMANTICS:
107 : --------------------
108 : Exceptions do NOT claim winner status. If a child throws, the exception
109 : is recorded but the combinator keeps waiting for a success. Only when
110 : all children complete without a winner does the combinator check: if
111 : any exception was recorded, it is rethrown (exception beats error_code).
112 : */
113 :
114 : namespace boost {
115 : namespace capy {
116 :
117 : namespace detail {
118 :
119 : /** Core shared state for when_any operations.
120 :
121 : Contains all members and methods common to both heterogeneous (variadic)
122 : and homogeneous (range) when_any implementations. State classes embed
123 : this via composition to avoid CRTP destructor ordering issues.
124 :
125 : @par Thread Safety
126 : Atomic operations protect winner selection and completion count.
127 : */
128 : struct when_any_core
129 : {
130 : std::atomic<std::size_t> remaining_count_;
131 : std::size_t winner_index_{0};
132 : std::exception_ptr winner_exception_;
133 : std::stop_source stop_source_;
134 :
135 : // Bridges parent's stop token to our stop_source
136 : struct stop_callback_fn
137 : {
138 : std::stop_source* source_;
139 HIT 2 : void operator()() const noexcept { source_->request_stop(); }
140 : };
141 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
142 : std::optional<stop_callback_t> parent_stop_callback_;
143 :
144 : continuation continuation_;
145 : io_env const* caller_env_ = nullptr;
146 :
147 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
148 : std::atomic<bool> has_winner_{false};
149 :
150 31 : explicit when_any_core(std::size_t count) noexcept
151 31 : : remaining_count_(count)
152 : {
153 31 : }
154 :
155 : /** Atomically claim winner status; exactly one task succeeds. */
156 52 : bool try_win(std::size_t index) noexcept
157 : {
158 52 : bool expected = false;
159 52 : if(has_winner_.compare_exchange_strong(
160 : expected, true, std::memory_order_acq_rel))
161 : {
162 22 : winner_index_ = index;
163 22 : stop_source_.request_stop();
164 22 : return true;
165 : }
166 30 : return false;
167 : }
168 :
169 : /** @pre try_win() returned true. */
170 MIS 0 : void set_winner_exception(std::exception_ptr ep) noexcept
171 : {
172 0 : winner_exception_ = ep;
173 0 : }
174 :
175 : // Runners signal completion directly via final_suspend; no member function needed.
176 : };
177 :
178 : } // namespace detail
179 :
180 : namespace detail {
181 :
182 : // State for io_result-aware when_any: only !ec wins.
183 : template<typename... Ts>
184 : struct when_any_io_state
185 : {
186 : static constexpr std::size_t task_count = sizeof...(Ts);
187 : using variant_type = std::variant<std::error_code, Ts...>;
188 :
189 : when_any_core core_;
190 : std::optional<variant_type> result_;
191 : std::array<continuation, task_count> runner_handles_{};
192 :
193 : // Last failure (error or exception) for the all-fail case.
194 : // Last writer wins — no priority between errors and exceptions.
195 : std::mutex failure_mu_;
196 : std::error_code last_error_;
197 : std::exception_ptr last_exception_;
198 :
199 HIT 16 : when_any_io_state()
200 16 : : core_(task_count)
201 : {
202 16 : }
203 :
204 12 : void record_error(std::error_code ec)
205 : {
206 12 : std::lock_guard lk(failure_mu_);
207 12 : last_error_ = ec;
208 12 : last_exception_ = nullptr;
209 12 : }
210 :
211 7 : void record_exception(std::exception_ptr ep)
212 : {
213 7 : std::lock_guard lk(failure_mu_);
214 7 : last_exception_ = ep;
215 7 : last_error_ = {};
216 7 : }
217 : };
218 :
219 : // Wrapper coroutine for io_result-aware when_any children.
220 : // unhandled_exception records the exception but does NOT claim winner status.
221 : template<typename StateType>
222 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
223 : {
224 : struct promise_type
225 : : frame_alloc_mixin
226 : {
227 : StateType* state_ = nullptr;
228 : std::size_t index_ = 0;
229 : io_env env_;
230 :
231 82 : when_any_io_runner get_return_object() noexcept
232 : {
233 : return when_any_io_runner(
234 82 : std::coroutine_handle<promise_type>::from_promise(*this));
235 : }
236 :
237 82 : std::suspend_always initial_suspend() noexcept { return {}; }
238 :
239 82 : auto final_suspend() noexcept
240 : {
241 : struct awaiter
242 : {
243 : promise_type* p_;
244 82 : bool await_ready() const noexcept { return false; }
245 82 : auto await_suspend(std::coroutine_handle<> h) noexcept
246 : {
247 82 : auto& core = p_->state_->core_;
248 82 : auto* counter = &core.remaining_count_;
249 82 : auto* caller_env = core.caller_env_;
250 82 : auto& cont = core.continuation_;
251 :
252 82 : h.destroy();
253 :
254 82 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
255 82 : if(remaining == 1)
256 31 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
257 51 : return detail::symmetric_transfer(std::noop_coroutine());
258 : }
259 MIS 0 : void await_resume() const noexcept {}
260 : };
261 HIT 82 : return awaiter{this};
262 : }
263 :
264 71 : void return_void() noexcept {}
265 :
266 : // Exceptions do NOT win in io_result when_any
267 11 : void unhandled_exception() noexcept
268 : {
269 11 : state_->record_exception(std::current_exception());
270 11 : }
271 :
272 : template<class Awaitable>
273 : struct transform_awaiter
274 : {
275 : std::decay_t<Awaitable> a_;
276 : promise_type* p_;
277 :
278 82 : bool await_ready() { return a_.await_ready(); }
279 82 : decltype(auto) await_resume() { return a_.await_resume(); }
280 :
281 : template<class Promise>
282 81 : auto await_suspend(std::coroutine_handle<Promise> h)
283 : {
284 : using R = decltype(a_.await_suspend(h, &p_->env_));
285 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
286 81 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
287 : else
288 : return a_.await_suspend(h, &p_->env_);
289 : }
290 : };
291 :
292 : template<class Awaitable>
293 82 : auto await_transform(Awaitable&& a)
294 : {
295 : using A = std::decay_t<Awaitable>;
296 : if constexpr (IoAwaitable<A>)
297 : {
298 : return transform_awaiter<Awaitable>{
299 163 : std::forward<Awaitable>(a), this};
300 : }
301 : else
302 : {
303 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
304 : }
305 81 : }
306 : };
307 :
308 : std::coroutine_handle<promise_type> h_;
309 :
310 82 : explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
311 82 : : h_(h)
312 : {
313 82 : }
314 :
315 : when_any_io_runner(when_any_io_runner&& other) noexcept
316 : : h_(std::exchange(other.h_, nullptr))
317 : {
318 : }
319 :
320 : when_any_io_runner(when_any_io_runner const&) = delete;
321 : when_any_io_runner& operator=(when_any_io_runner const&) = delete;
322 : when_any_io_runner& operator=(when_any_io_runner&&) = delete;
323 :
324 82 : auto release() noexcept
325 : {
326 82 : return std::exchange(h_, nullptr);
327 : }
328 : };
329 :
330 : // Runner coroutine: only tries to win when the child returns !ec.
331 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
332 : when_any_io_runner<StateType>
333 30 : make_when_any_io_runner(Awaitable inner, StateType* state)
334 : {
335 : auto result = co_await std::move(inner);
336 :
337 : if(!result.ec)
338 : {
339 : // Success: try to claim winner
340 : if(state->core_.try_win(I))
341 : {
342 : try
343 : {
344 : state->result_.emplace(
345 : std::in_place_index<I + 1>,
346 : detail::extract_io_payload(std::move(result)));
347 : }
348 : catch(...)
349 : {
350 : state->core_.set_winner_exception(std::current_exception());
351 : }
352 : }
353 : }
354 : else
355 : {
356 : // Error: record but don't win
357 : state->record_error(result.ec);
358 : }
359 60 : }
360 :
361 : // Launcher for io_result-aware when_any.
362 : template<IoAwaitable... Awaitables>
363 : class when_any_io_launcher
364 : {
365 : using state_type = when_any_io_state<
366 : io_result_payload_t<awaitable_result_t<Awaitables>>...>;
367 :
368 : std::tuple<Awaitables...>* tasks_;
369 : state_type* state_;
370 :
371 : public:
372 16 : when_any_io_launcher(
373 : std::tuple<Awaitables...>* tasks,
374 : state_type* state)
375 16 : : tasks_(tasks)
376 16 : , state_(state)
377 : {
378 16 : }
379 :
380 16 : bool await_ready() const noexcept
381 : {
382 16 : return sizeof...(Awaitables) == 0;
383 : }
384 :
385 16 : std::coroutine_handle<> await_suspend(
386 : std::coroutine_handle<> continuation, io_env const* caller_env)
387 : {
388 16 : state_->core_.continuation_.h = continuation;
389 16 : state_->core_.caller_env_ = caller_env;
390 :
391 16 : if(caller_env->stop_token.stop_possible())
392 : {
393 2 : state_->core_.parent_stop_callback_.emplace(
394 1 : caller_env->stop_token,
395 1 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
396 :
397 1 : if(caller_env->stop_token.stop_requested())
398 MIS 0 : state_->core_.stop_source_.request_stop();
399 : }
400 :
401 HIT 16 : auto token = state_->core_.stop_source_.get_token();
402 28 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
403 16 : (..., launch_one<Is>(caller_env->executor, token));
404 16 : }(std::index_sequence_for<Awaitables...>{});
405 :
406 32 : return std::noop_coroutine();
407 16 : }
408 :
409 16 : void await_resume() const noexcept {}
410 :
411 : private:
412 : template<std::size_t I>
413 30 : void launch_one(executor_ref caller_ex, std::stop_token token)
414 : {
415 30 : auto runner = make_when_any_io_runner<I>(
416 30 : std::move(std::get<I>(*tasks_)), state_);
417 :
418 30 : auto h = runner.release();
419 30 : h.promise().state_ = state_;
420 30 : h.promise().index_ = I;
421 30 : h.promise().env_ = io_env{caller_ex, token,
422 30 : state_->core_.caller_env_->frame_allocator};
423 :
424 30 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
425 30 : caller_ex.post(state_->runner_handles_[I]);
426 60 : }
427 : };
428 :
429 : /** Shared state for homogeneous io_result-aware when_any (range overload).
430 :
431 : @tparam T The payload type extracted from io_result.
432 : */
433 : template<typename T>
434 : struct when_any_io_homogeneous_state
435 : {
436 : when_any_core core_;
437 : std::optional<T> result_;
438 : std::unique_ptr<continuation[]> runner_handles_;
439 :
440 : std::mutex failure_mu_;
441 : std::error_code last_error_;
442 : std::exception_ptr last_exception_;
443 :
444 13 : explicit when_any_io_homogeneous_state(std::size_t count)
445 13 : : core_(count)
446 13 : , runner_handles_(std::make_unique<continuation[]>(count))
447 : {
448 13 : }
449 :
450 6 : void record_error(std::error_code ec)
451 : {
452 6 : std::lock_guard lk(failure_mu_);
453 6 : last_error_ = ec;
454 6 : last_exception_ = nullptr;
455 6 : }
456 :
457 4 : void record_exception(std::exception_ptr ep)
458 : {
459 4 : std::lock_guard lk(failure_mu_);
460 4 : last_exception_ = ep;
461 4 : last_error_ = {};
462 4 : }
463 : };
464 :
465 : /** Specialization for void io_result children (no payload storage). */
466 : template<>
467 : struct when_any_io_homogeneous_state<std::tuple<>>
468 : {
469 : when_any_core core_;
470 : std::unique_ptr<continuation[]> runner_handles_;
471 :
472 : std::mutex failure_mu_;
473 : std::error_code last_error_;
474 : std::exception_ptr last_exception_;
475 :
476 2 : explicit when_any_io_homogeneous_state(std::size_t count)
477 2 : : core_(count)
478 2 : , runner_handles_(std::make_unique<continuation[]>(count))
479 : {
480 2 : }
481 :
482 1 : void record_error(std::error_code ec)
483 : {
484 1 : std::lock_guard lk(failure_mu_);
485 1 : last_error_ = ec;
486 1 : last_exception_ = nullptr;
487 1 : }
488 :
489 MIS 0 : void record_exception(std::exception_ptr ep)
490 : {
491 0 : std::lock_guard lk(failure_mu_);
492 0 : last_exception_ = ep;
493 0 : last_error_ = {};
494 0 : }
495 : };
496 :
497 : /** Create an io_result-aware runner for homogeneous when_any (range path).
498 :
499 : Only tries to win when the child returns !ec.
500 : */
501 : template<IoAwaitable Awaitable, typename StateType>
502 : when_any_io_runner<StateType>
503 HIT 52 : make_when_any_io_homogeneous_runner(
504 : Awaitable inner, StateType* state, std::size_t index)
505 : {
506 : auto result = co_await std::move(inner);
507 :
508 : if(!result.ec)
509 : {
510 : if(state->core_.try_win(index))
511 : {
512 : using PayloadT = io_result_payload_t<
513 : awaitable_result_t<Awaitable>>;
514 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
515 : {
516 : try
517 : {
518 : state->result_.emplace(
519 : extract_io_payload(std::move(result)));
520 : }
521 : catch(...)
522 : {
523 : state->core_.set_winner_exception(
524 : std::current_exception());
525 : }
526 : }
527 : }
528 : }
529 : else
530 : {
531 : state->record_error(result.ec);
532 : }
533 104 : }
534 :
535 : /** Launches all io_result-aware homogeneous runners concurrently. */
536 : template<IoAwaitableRange Range>
537 : class when_any_io_homogeneous_launcher
538 : {
539 : using Awaitable = std::ranges::range_value_t<Range>;
540 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
541 :
542 : Range* range_;
543 : when_any_io_homogeneous_state<PayloadT>* state_;
544 :
545 : public:
546 15 : when_any_io_homogeneous_launcher(
547 : Range* range,
548 : when_any_io_homogeneous_state<PayloadT>* state)
549 15 : : range_(range)
550 15 : , state_(state)
551 : {
552 15 : }
553 :
554 15 : bool await_ready() const noexcept
555 : {
556 15 : return std::ranges::empty(*range_);
557 : }
558 :
559 15 : std::coroutine_handle<> await_suspend(
560 : std::coroutine_handle<> continuation, io_env const* caller_env)
561 : {
562 15 : state_->core_.continuation_.h = continuation;
563 15 : state_->core_.caller_env_ = caller_env;
564 :
565 15 : if(caller_env->stop_token.stop_possible())
566 : {
567 4 : state_->core_.parent_stop_callback_.emplace(
568 2 : caller_env->stop_token,
569 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
570 :
571 2 : if(caller_env->stop_token.stop_requested())
572 1 : state_->core_.stop_source_.request_stop();
573 : }
574 :
575 15 : auto token = state_->core_.stop_source_.get_token();
576 :
577 : // Phase 1: Create all runners without dispatching.
578 15 : std::size_t index = 0;
579 67 : for(auto&& a : *range_)
580 : {
581 52 : auto runner = make_when_any_io_homogeneous_runner(
582 52 : std::move(a), state_, index);
583 :
584 52 : auto h = runner.release();
585 52 : h.promise().state_ = state_;
586 52 : h.promise().index_ = index;
587 52 : h.promise().env_ = io_env{caller_env->executor, token,
588 52 : caller_env->frame_allocator};
589 :
590 52 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
591 52 : ++index;
592 : }
593 :
594 : // Phase 2: Post all runners. Any may complete synchronously.
595 15 : auto* handles = state_->runner_handles_.get();
596 15 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
597 67 : for(std::size_t i = 0; i < count; ++i)
598 52 : caller_env->executor.post(handles[i]);
599 :
600 30 : return std::noop_coroutine();
601 67 : }
602 :
603 15 : void await_resume() const noexcept {}
604 : };
605 :
606 : } // namespace detail
607 :
608 : /** Race a range of io_result-returning awaitables (non-void payloads).
609 :
610 : Only a child returning !ec can win. Errors and exceptions do not
611 : claim winner status. If all children fail, the last failure
612 : is reported — either the last error_code at variant index 0,
613 : or the last exception rethrown.
614 :
615 : @param awaitables Range of io_result-returning awaitables (must
616 : not be empty).
617 :
618 : @return A task yielding variant<error_code, pair<size_t, PayloadT>>
619 : where index 0 is failure and index 1 carries the winner's
620 : index and payload.
621 :
622 : @throws std::invalid_argument if range is empty.
623 : @throws Rethrows last exception when no winner and the last
624 : failure was an exception.
625 :
626 : @par Example
627 : @code
628 : task<void> example()
629 : {
630 : std::vector<io_task<size_t>> reads;
631 : for (auto& buf : buffers)
632 : reads.push_back(stream.read_some(buf));
633 :
634 : auto result = co_await when_any(std::move(reads));
635 : if (result.index() == 1)
636 : {
637 : auto [idx, n] = std::get<1>(result);
638 : }
639 : }
640 : @endcode
641 :
642 : @see IoAwaitableRange, when_any
643 : */
644 : template<IoAwaitableRange R>
645 : requires detail::is_io_result_v<
646 : awaitable_result_t<std::ranges::range_value_t<R>>>
647 : && (!std::is_same_v<
648 : detail::io_result_payload_t<
649 : awaitable_result_t<std::ranges::range_value_t<R>>>,
650 : std::tuple<>>)
651 14 : [[nodiscard]] auto when_any(R&& awaitables)
652 : -> task<std::variant<std::error_code,
653 : std::pair<std::size_t,
654 : detail::io_result_payload_t<
655 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
656 : {
657 : using Awaitable = std::ranges::range_value_t<R>;
658 : using PayloadT = detail::io_result_payload_t<
659 : awaitable_result_t<Awaitable>>;
660 : using result_type = std::variant<std::error_code,
661 : std::pair<std::size_t, PayloadT>>;
662 : using OwnedRange = std::remove_cvref_t<R>;
663 :
664 : auto count = std::ranges::size(awaitables);
665 : if(count == 0)
666 : throw std::invalid_argument("when_any requires at least one awaitable");
667 :
668 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
669 :
670 : detail::when_any_io_homogeneous_state<PayloadT> state(count);
671 :
672 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
673 : &owned_awaitables, &state);
674 :
675 : // Winner found
676 : if(state.core_.has_winner_.load(std::memory_order_acquire))
677 : {
678 : if(state.core_.winner_exception_)
679 : std::rethrow_exception(state.core_.winner_exception_);
680 : co_return result_type{std::in_place_index<1>,
681 : std::pair{state.core_.winner_index_, std::move(*state.result_)}};
682 : }
683 :
684 : // No winner — report last failure
685 : if(state.last_exception_)
686 : std::rethrow_exception(state.last_exception_);
687 : co_return result_type{std::in_place_index<0>, state.last_error_};
688 28 : }
689 :
690 : /** Race a range of void io_result-returning awaitables.
691 :
692 : Only a child returning !ec can win. Returns the winner's index
693 : at variant index 1, or error_code at index 0 on all-fail.
694 :
695 : @param awaitables Range of io_result<>-returning awaitables (must
696 : not be empty).
697 :
698 : @return A task yielding variant<error_code, size_t> where index 0
699 : is failure and index 1 carries the winner's index.
700 :
701 : @throws std::invalid_argument if range is empty.
702 : @throws Rethrows first exception when no winner and at least one
703 : child threw.
704 :
705 : @par Example
706 : @code
707 : task<void> example()
708 : {
709 : std::vector<io_task<>> jobs;
710 : jobs.push_back(background_work_a());
711 : jobs.push_back(background_work_b());
712 :
713 : auto result = co_await when_any(std::move(jobs));
714 : if (result.index() == 1)
715 : {
716 : auto winner = std::get<1>(result);
717 : }
718 : }
719 : @endcode
720 :
721 : @see IoAwaitableRange, when_any
722 : */
723 : template<IoAwaitableRange R>
724 : requires detail::is_io_result_v<
725 : awaitable_result_t<std::ranges::range_value_t<R>>>
726 : && std::is_same_v<
727 : detail::io_result_payload_t<
728 : awaitable_result_t<std::ranges::range_value_t<R>>>,
729 : std::tuple<>>
730 2 : [[nodiscard]] auto when_any(R&& awaitables)
731 : -> task<std::variant<std::error_code, std::size_t>>
732 : {
733 : using OwnedRange = std::remove_cvref_t<R>;
734 : using result_type = std::variant<std::error_code, std::size_t>;
735 :
736 : auto count = std::ranges::size(awaitables);
737 : if(count == 0)
738 : throw std::invalid_argument("when_any requires at least one awaitable");
739 :
740 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
741 :
742 : detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
743 :
744 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
745 : &owned_awaitables, &state);
746 :
747 : // Winner found
748 : if(state.core_.has_winner_.load(std::memory_order_acquire))
749 : {
750 : if(state.core_.winner_exception_)
751 : std::rethrow_exception(state.core_.winner_exception_);
752 : co_return result_type{std::in_place_index<1>,
753 : state.core_.winner_index_};
754 : }
755 :
756 : // No winner — report last failure
757 : if(state.last_exception_)
758 : std::rethrow_exception(state.last_exception_);
759 : co_return result_type{std::in_place_index<0>, state.last_error_};
760 4 : }
761 :
762 : /** Race io_result-returning awaitables, selecting the first success.
763 :
764 : Overload selected when all children return io_result<Ts...>.
765 : Only a child returning !ec can win. Errors and exceptions do
766 : not claim winner status.
767 :
768 : @return A task yielding variant<error_code, R1, ..., Rn> where
769 : index 0 is the failure/no-winner case and index i+1
770 : identifies the winning child.
771 : */
772 : template<IoAwaitable... As>
773 : requires (sizeof...(As) > 0)
774 : && detail::all_io_result_awaitables<As...>
775 16 : [[nodiscard]] auto when_any(As... as)
776 : -> task<std::variant<
777 : std::error_code,
778 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
779 : {
780 : using result_type = std::variant<
781 : std::error_code,
782 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
783 :
784 : detail::when_any_io_state<
785 : detail::io_result_payload_t<awaitable_result_t<As>>...> state;
786 : std::tuple<As...> awaitable_tuple(std::move(as)...);
787 :
788 : co_await detail::when_any_io_launcher<As...>(
789 : &awaitable_tuple, &state);
790 :
791 : // Winner found: return their result
792 : if(state.result_.has_value())
793 : co_return std::move(*state.result_);
794 :
795 : // Winner claimed but payload construction failed
796 : if(state.core_.winner_exception_)
797 : std::rethrow_exception(state.core_.winner_exception_);
798 :
799 : // No winner — report last failure
800 : if(state.last_exception_)
801 : std::rethrow_exception(state.last_exception_);
802 : co_return result_type{std::in_place_index<0>, state.last_error_};
803 32 : }
804 :
805 : } // namespace capy
806 : } // namespace boost
807 :
808 : #endif
|