100.00% Lines (160/160) 100.00% Functions (43/43)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_WHEN_ALL_HPP 10   #ifndef BOOST_CAPY_WHEN_ALL_HPP
11   #define BOOST_CAPY_WHEN_ALL_HPP 11   #define BOOST_CAPY_WHEN_ALL_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/io_result_combinators.hpp> 14   #include <boost/capy/detail/io_result_combinators.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <boost/capy/concept/executor.hpp> 16   #include <boost/capy/concept/executor.hpp>
17   #include <boost/capy/concept/io_awaitable.hpp> 17   #include <boost/capy/concept/io_awaitable.hpp>
18   #include <coroutine> 18   #include <coroutine>
19   #include <boost/capy/ex/frame_alloc_mixin.hpp> 19   #include <boost/capy/ex/frame_alloc_mixin.hpp>
20   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/ex/frame_allocator.hpp> 21   #include <boost/capy/ex/frame_allocator.hpp>
22   #include <boost/capy/task.hpp> 22   #include <boost/capy/task.hpp>
23   23  
24   #include <array> 24   #include <array>
25   #include <atomic> 25   #include <atomic>
26   #include <exception> 26   #include <exception>
27   #include <memory> 27   #include <memory>
28   #include <optional> 28   #include <optional>
29   #include <ranges> 29   #include <ranges>
30   #include <stdexcept> 30   #include <stdexcept>
31   #include <stop_token> 31   #include <stop_token>
32   #include <tuple> 32   #include <tuple>
33   #include <type_traits> 33   #include <type_traits>
34   #include <utility> 34   #include <utility>
35   #include <vector> 35   #include <vector>
36   36  
37   namespace boost { 37   namespace boost {
38   namespace capy { 38   namespace capy {
39   39  
40   namespace detail { 40   namespace detail {
41   41  
42   /** Holds the result of a single task within when_all. 42   /** Holds the result of a single task within when_all.
43   */ 43   */
44   template<typename T> 44   template<typename T>
45   struct result_holder 45   struct result_holder
46   { 46   {
47   std::optional<T> value_; 47   std::optional<T> value_;
48   48  
HITCBC 49   83 void set(T v) 49   83 void set(T v)
50   { 50   {
HITCBC 51   83 value_ = std::move(v); 51   83 value_ = std::move(v);
HITCBC 52   83 } 52   83 }
53   53  
HITCBC 54   71 T get() && 54   71 T get() &&
55   { 55   {
HITCBC 56   71 return std::move(*value_); 56   71 return std::move(*value_);
57   } 57   }
58   }; 58   };
59   59  
60   /** Core shared state for when_all operations. 60   /** Core shared state for when_all operations.
61   61  
62   Contains all members and methods common to both heterogeneous (variadic) 62   Contains all members and methods common to both heterogeneous (variadic)
63   and homogeneous (range) when_all implementations. State classes embed 63   and homogeneous (range) when_all implementations. State classes embed
64   this via composition to avoid CRTP destructor ordering issues. 64   this via composition to avoid CRTP destructor ordering issues.
65   65  
66   @par Thread Safety 66   @par Thread Safety
67   Atomic operations protect exception capture and completion count. 67   Atomic operations protect exception capture and completion count.
68   */ 68   */
69   struct when_all_core 69   struct when_all_core
70   { 70   {
71   std::atomic<std::size_t> remaining_count_; 71   std::atomic<std::size_t> remaining_count_;
72   72  
73   // Exception storage - first error wins, others discarded 73   // Exception storage - first error wins, others discarded
74   std::atomic<bool> has_exception_{false}; 74   std::atomic<bool> has_exception_{false};
75   std::exception_ptr first_exception_; 75   std::exception_ptr first_exception_;
76   76  
77   std::stop_source stop_source_; 77   std::stop_source stop_source_;
78   78  
79   // Bridges parent's stop token to our stop_source 79   // Bridges parent's stop token to our stop_source
80   struct stop_callback_fn 80   struct stop_callback_fn
81   { 81   {
82   std::stop_source* source_; 82   std::stop_source* source_;
HITCBC 83   4 void operator()() const { source_->request_stop(); } 83   4 void operator()() const { source_->request_stop(); }
84   }; 84   };
85   using stop_callback_t = std::stop_callback<stop_callback_fn>; 85   using stop_callback_t = std::stop_callback<stop_callback_fn>;
86   std::optional<stop_callback_t> parent_stop_callback_; 86   std::optional<stop_callback_t> parent_stop_callback_;
87   87  
88   continuation continuation_; 88   continuation continuation_;
89   io_env const* caller_env_ = nullptr; 89   io_env const* caller_env_ = nullptr;
90   90  
HITCBC 91   75 explicit when_all_core(std::size_t count) noexcept 91   75 explicit when_all_core(std::size_t count) noexcept
HITCBC 92   75 : remaining_count_(count) 92   75 : remaining_count_(count)
93   { 93   {
HITCBC 94   75 } 94   75 }
95   95  
96   /** Capture an exception (first one wins). */ 96   /** Capture an exception (first one wins). */
HITCBC 97   19 void capture_exception(std::exception_ptr ep) 97   19 void capture_exception(std::exception_ptr ep)
98   { 98   {
HITCBC 99   19 bool expected = false; 99   19 bool expected = false;
HITCBC 100   19 if(has_exception_.compare_exchange_strong( 100   19 if(has_exception_.compare_exchange_strong(
101   expected, true, std::memory_order_relaxed)) 101   expected, true, std::memory_order_relaxed))
HITCBC 102   17 first_exception_ = ep; 102   17 first_exception_ = ep;
HITCBC 103   19 } 103   19 }
104   }; 104   };
105   105  
106   /** Shared state for heterogeneous when_all (variadic overload). 106   /** Shared state for heterogeneous when_all (variadic overload).
107   107  
108   @tparam Ts The result types of the tasks. 108   @tparam Ts The result types of the tasks.
109   */ 109   */
110   template<typename... Ts> 110   template<typename... Ts>
111   struct when_all_state 111   struct when_all_state
112   { 112   {
113   static constexpr std::size_t task_count = sizeof...(Ts); 113   static constexpr std::size_t task_count = sizeof...(Ts);
114   114  
115   when_all_core core_; 115   when_all_core core_;
116   std::tuple<result_holder<Ts>...> results_; 116   std::tuple<result_holder<Ts>...> results_;
117   std::array<continuation, task_count> runner_handles_{}; 117   std::array<continuation, task_count> runner_handles_{};
118   118  
119   std::atomic<bool> has_error_{false}; 119   std::atomic<bool> has_error_{false};
120   std::error_code first_error_; 120   std::error_code first_error_;
121   121  
HITCBC 122   51 when_all_state() 122   51 when_all_state()
HITCBC 123   51 : core_(task_count) 123   51 : core_(task_count)
124   { 124   {
HITCBC 125   51 } 125   51 }
126   126  
127   /** Record the first error (subsequent errors are discarded). */ 127   /** Record the first error (subsequent errors are discarded). */
HITCBC 128   43 void record_error(std::error_code ec) 128   43 void record_error(std::error_code ec)
129   { 129   {
HITCBC 130   43 bool expected = false; 130   43 bool expected = false;
HITCBC 131   43 if(has_error_.compare_exchange_strong( 131   43 if(has_error_.compare_exchange_strong(
132   expected, true, std::memory_order_relaxed)) 132   expected, true, std::memory_order_relaxed))
HITCBC 133   29 first_error_ = ec; 133   29 first_error_ = ec;
HITCBC 134   43 } 134   43 }
135   }; 135   };
136   136  
137   /** Shared state for homogeneous when_all (range overload). 137   /** Shared state for homogeneous when_all (range overload).
138   138  
139   Stores extracted io_result payloads in a vector indexed by task 139   Stores extracted io_result payloads in a vector indexed by task
140   position. Tracks the first error_code for error propagation. 140   position. Tracks the first error_code for error propagation.
141   141  
142   @tparam T The payload type extracted from io_result. 142   @tparam T The payload type extracted from io_result.
143   */ 143   */
144   template<typename T> 144   template<typename T>
145   struct when_all_homogeneous_state 145   struct when_all_homogeneous_state
146   { 146   {
147   when_all_core core_; 147   when_all_core core_;
148   std::vector<std::optional<T>> results_; 148   std::vector<std::optional<T>> results_;
149   std::unique_ptr<continuation[]> runner_handles_; 149   std::unique_ptr<continuation[]> runner_handles_;
150   150  
151   std::atomic<bool> has_error_{false}; 151   std::atomic<bool> has_error_{false};
152   std::error_code first_error_; 152   std::error_code first_error_;
153   153  
HITCBC 154   12 explicit when_all_homogeneous_state(std::size_t count) 154   12 explicit when_all_homogeneous_state(std::size_t count)
HITCBC 155   12 : core_(count) 155   12 : core_(count)
HITCBC 156   24 , results_(count) 156   24 , results_(count)
HITCBC 157   12 , runner_handles_(std::make_unique<continuation[]>(count)) 157   12 , runner_handles_(std::make_unique<continuation[]>(count))
158   { 158   {
HITCBC 159   12 } 159   12 }
160   160  
HITCBC 161   18 void set_result(std::size_t index, T value) 161   18 void set_result(std::size_t index, T value)
162   { 162   {
HITCBC 163   18 results_[index].emplace(std::move(value)); 163   18 results_[index].emplace(std::move(value));
HITCBC 164   18 } 164   18 }
165   165  
166   /** Record the first error (subsequent errors are discarded). */ 166   /** Record the first error (subsequent errors are discarded). */
HITCBC 167   7 void record_error(std::error_code ec) 167   7 void record_error(std::error_code ec)
168   { 168   {
HITCBC 169   7 bool expected = false; 169   7 bool expected = false;
HITCBC 170   7 if(has_error_.compare_exchange_strong( 170   7 if(has_error_.compare_exchange_strong(
171   expected, true, std::memory_order_relaxed)) 171   expected, true, std::memory_order_relaxed))
HITCBC 172   5 first_error_ = ec; 172   5 first_error_ = ec;
HITCBC 173   7 } 173   7 }
174   }; 174   };
175   175  
176   /** Specialization for void io_result children (no payload storage). */ 176   /** Specialization for void io_result children (no payload storage). */
177   template<> 177   template<>
178   struct when_all_homogeneous_state<std::tuple<>> 178   struct when_all_homogeneous_state<std::tuple<>>
179   { 179   {
180   when_all_core core_; 180   when_all_core core_;
181   std::unique_ptr<continuation[]> runner_handles_; 181   std::unique_ptr<continuation[]> runner_handles_;
182   182  
183   std::atomic<bool> has_error_{false}; 183   std::atomic<bool> has_error_{false};
184   std::error_code first_error_; 184   std::error_code first_error_;
185   185  
HITCBC 186   3 explicit when_all_homogeneous_state(std::size_t count) 186   3 explicit when_all_homogeneous_state(std::size_t count)
HITCBC 187   3 : core_(count) 187   3 : core_(count)
HITCBC 188   3 , runner_handles_(std::make_unique<continuation[]>(count)) 188   3 , runner_handles_(std::make_unique<continuation[]>(count))
189   { 189   {
HITCBC 190   3 } 190   3 }
191   191  
192   /** Record the first error (subsequent errors are discarded). */ 192   /** Record the first error (subsequent errors are discarded). */
HITCBC 193   1 void record_error(std::error_code ec) 193   1 void record_error(std::error_code ec)
194   { 194   {
HITCBC 195   1 bool expected = false; 195   1 bool expected = false;
HITCBC 196   1 if(has_error_.compare_exchange_strong( 196   1 if(has_error_.compare_exchange_strong(
197   expected, true, std::memory_order_relaxed)) 197   expected, true, std::memory_order_relaxed))
HITCBC 198   1 first_error_ = ec; 198   1 first_error_ = ec;
HITCBC 199   1 } 199   1 }
200   }; 200   };
201   201  
202   /** Wrapper coroutine that intercepts task completion for when_all. 202   /** Wrapper coroutine that intercepts task completion for when_all.
203   203  
204   Parameterized on StateType to work with both heterogeneous (variadic) 204   Parameterized on StateType to work with both heterogeneous (variadic)
205   and homogeneous (range) state types. All state types expose their 205   and homogeneous (range) state types. All state types expose their
206   shared members through a `core_` member of type when_all_core. 206   shared members through a `core_` member of type when_all_core.
207   207  
208   @tparam StateType The state type (when_all_state or when_all_homogeneous_state). 208   @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
209   */ 209   */
210   template<typename StateType> 210   template<typename StateType>
211   struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner 211   struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
212   { 212   {
213   struct promise_type 213   struct promise_type
214   : frame_alloc_mixin 214   : frame_alloc_mixin
215   { 215   {
216   StateType* state_ = nullptr; 216   StateType* state_ = nullptr;
217   std::size_t index_ = 0; 217   std::size_t index_ = 0;
218   io_env env_; 218   io_env env_;
219   219  
HITCBC 220   151 when_all_runner get_return_object() noexcept 220   151 when_all_runner get_return_object() noexcept
221   { 221   {
222   return when_all_runner( 222   return when_all_runner(
HITCBC 223   151 std::coroutine_handle<promise_type>::from_promise(*this)); 223   151 std::coroutine_handle<promise_type>::from_promise(*this));
224   } 224   }
225   225  
HITCBC 226   151 std::suspend_always initial_suspend() noexcept 226   151 std::suspend_always initial_suspend() noexcept
227   { 227   {
HITCBC 228   151 return {}; 228   151 return {};
229   } 229   }
230   230  
HITCBC 231   151 auto final_suspend() noexcept 231   151 auto final_suspend() noexcept
232   { 232   {
233   struct awaiter 233   struct awaiter
234   { 234   {
235   promise_type* p_; 235   promise_type* p_;
HITCBC 236   151 bool await_ready() const noexcept { return false; } 236   151 bool await_ready() const noexcept { return false; }
HITCBC 237   151 auto await_suspend(std::coroutine_handle<> h) noexcept 237   151 auto await_suspend(std::coroutine_handle<> h) noexcept
238   { 238   {
HITCBC 239   151 auto& core = p_->state_->core_; 239   151 auto& core = p_->state_->core_;
HITCBC 240   151 auto* counter = &core.remaining_count_; 240   151 auto* counter = &core.remaining_count_;
HITCBC 241   151 auto* caller_env = core.caller_env_; 241   151 auto* caller_env = core.caller_env_;
HITCBC 242   151 auto& cont = core.continuation_; 242   151 auto& cont = core.continuation_;
243   243  
HITCBC 244   151 h.destroy(); 244   151 h.destroy();
245   245  
HITCBC 246   151 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); 246   151 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
HITCBC 247   151 if(remaining == 1) 247   151 if(remaining == 1)
HITCBC 248   75 return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); 248   75 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
HITCBC 249   76 return detail::symmetric_transfer(std::noop_coroutine()); 249   76 return detail::symmetric_transfer(std::noop_coroutine());
250   } 250   }
251   void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed 251   void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
252   }; 252   };
HITCBC 253   151 return awaiter{this}; 253   151 return awaiter{this};
254   } 254   }
255   255  
HITCBC 256   132 void return_void() noexcept {} 256   132 void return_void() noexcept {}
257   257  
HITCBC 258   19 void unhandled_exception() noexcept 258   19 void unhandled_exception() noexcept
259   { 259   {
HITCBC 260   19 state_->core_.capture_exception(std::current_exception()); 260   19 state_->core_.capture_exception(std::current_exception());
HITCBC 261   19 state_->core_.stop_source_.request_stop(); 261   19 state_->core_.stop_source_.request_stop();
HITCBC 262   19 } 262   19 }
263   263  
264   template<class Awaitable> 264   template<class Awaitable>
265   struct transform_awaiter 265   struct transform_awaiter
266   { 266   {
267   std::decay_t<Awaitable> a_; 267   std::decay_t<Awaitable> a_;
268   promise_type* p_; 268   promise_type* p_;
269   269  
HITCBC 270   151 bool await_ready() { return a_.await_ready(); } 270   151 bool await_ready() { return a_.await_ready(); }
HITCBC 271   151 decltype(auto) await_resume() { return a_.await_resume(); } 271   151 decltype(auto) await_resume() { return a_.await_resume(); }
272   272  
273   template<class Promise> 273   template<class Promise>
HITCBC 274   150 auto await_suspend(std::coroutine_handle<Promise> h) 274   150 auto await_suspend(std::coroutine_handle<Promise> h)
275   { 275   {
276   using R = decltype(a_.await_suspend(h, &p_->env_)); 276   using R = decltype(a_.await_suspend(h, &p_->env_));
277   if constexpr (std::is_same_v<R, std::coroutine_handle<>>) 277   if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
HITCBC 278   150 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_)); 278   150 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
279   else 279   else
280   return a_.await_suspend(h, &p_->env_); 280   return a_.await_suspend(h, &p_->env_);
281   } 281   }
282   }; 282   };
283   283  
284   template<class Awaitable> 284   template<class Awaitable>
HITCBC 285   151 auto await_transform(Awaitable&& a) 285   151 auto await_transform(Awaitable&& a)
286   { 286   {
287   using A = std::decay_t<Awaitable>; 287   using A = std::decay_t<Awaitable>;
288   if constexpr (IoAwaitable<A>) 288   if constexpr (IoAwaitable<A>)
289   { 289   {
290   return transform_awaiter<Awaitable>{ 290   return transform_awaiter<Awaitable>{
HITCBC 291   302 std::forward<Awaitable>(a), this}; 291   302 std::forward<Awaitable>(a), this};
292   } 292   }
293   else 293   else
294   { 294   {
295   static_assert(sizeof(A) == 0, "requires IoAwaitable"); 295   static_assert(sizeof(A) == 0, "requires IoAwaitable");
296   } 296   }
HITCBC 297   151 } 297   151 }
298   }; 298   };
299   299  
300   std::coroutine_handle<promise_type> h_; 300   std::coroutine_handle<promise_type> h_;
301   301  
HITCBC 302   151 explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept 302   151 explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
HITCBC 303   151 : h_(h) 303   151 : h_(h)
304   { 304   {
HITCBC 305   151 } 305   151 }
306   306  
307   // Enable move for all clang versions - some versions need it 307   // Enable move for all clang versions - some versions need it
308   when_all_runner(when_all_runner&& other) noexcept 308   when_all_runner(when_all_runner&& other) noexcept
309   : h_(std::exchange(other.h_, nullptr)) 309   : h_(std::exchange(other.h_, nullptr))
310   { 310   {
311   } 311   }
312   312  
313   when_all_runner(when_all_runner const&) = delete; 313   when_all_runner(when_all_runner const&) = delete;
314   when_all_runner& operator=(when_all_runner const&) = delete; 314   when_all_runner& operator=(when_all_runner const&) = delete;
315   when_all_runner& operator=(when_all_runner&&) = delete; 315   when_all_runner& operator=(when_all_runner&&) = delete;
316   316  
HITCBC 317   151 auto release() noexcept 317   151 auto release() noexcept
318   { 318   {
HITCBC 319   151 return std::exchange(h_, nullptr); 319   151 return std::exchange(h_, nullptr);
320   } 320   }
321   }; 321   };
322   322  
323   /** Create an io_result-aware runner for a single awaitable (range path). 323   /** Create an io_result-aware runner for a single awaitable (range path).
324   324  
325   Checks the error code, records errors and requests stop on failure, 325   Checks the error code, records errors and requests stop on failure,
326   or extracts the payload on success. 326   or extracts the payload on success.
327   */ 327   */
328   template<IoAwaitable Awaitable, typename StateType> 328   template<IoAwaitable Awaitable, typename StateType>
329   when_all_runner<StateType> 329   when_all_runner<StateType>
HITCBC 330   34 make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) 330   34 make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
331   { 331   {
332   auto result = co_await std::move(inner); 332   auto result = co_await std::move(inner);
333   333  
334   if(result.ec) 334   if(result.ec)
335   { 335   {
336   state->record_error(result.ec); 336   state->record_error(result.ec);
337   state->core_.stop_source_.request_stop(); 337   state->core_.stop_source_.request_stop();
338   } 338   }
339   else 339   else
340   { 340   {
341   using PayloadT = io_result_payload_t< 341   using PayloadT = io_result_payload_t<
342   awaitable_result_t<Awaitable>>; 342   awaitable_result_t<Awaitable>>;
343   if constexpr (!std::is_same_v<PayloadT, std::tuple<>>) 343   if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
344   { 344   {
345   state->set_result(index, 345   state->set_result(index,
346   extract_io_payload(std::move(result))); 346   extract_io_payload(std::move(result)));
347   } 347   }
348   } 348   }
HITCBC 349   68 } 349   68 }
350   350  
351   /** Create a runner for io_result children that requests stop on ec. */ 351   /** Create a runner for io_result children that requests stop on ec. */
352   template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> 352   template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
353   when_all_runner<when_all_state<Ts...>> 353   when_all_runner<when_all_state<Ts...>>
HITCBC 354   99 make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state) 354   99 make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
355   { 355   {
356   auto result = co_await std::move(inner); 356   auto result = co_await std::move(inner);
357   auto ec = result.ec; 357   auto ec = result.ec;
358   std::get<Index>(state->results_).set(std::move(result)); 358   std::get<Index>(state->results_).set(std::move(result));
359   359  
360   if(ec) 360   if(ec)
361   { 361   {
362   state->record_error(ec); 362   state->record_error(ec);
363   state->core_.stop_source_.request_stop(); 363   state->core_.stop_source_.request_stop();
364   } 364   }
HITCBC 365   198 } 365   198 }
366   366  
367   /** Launcher that uses io_result-aware runners. */ 367   /** Launcher that uses io_result-aware runners. */
368   template<IoAwaitable... Awaitables> 368   template<IoAwaitable... Awaitables>
369   class when_all_io_launcher 369   class when_all_io_launcher
370   { 370   {
371   using state_type = when_all_state<awaitable_result_t<Awaitables>...>; 371   using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
372   372  
373   std::tuple<Awaitables...>* awaitables_; 373   std::tuple<Awaitables...>* awaitables_;
374   state_type* state_; 374   state_type* state_;
375   375  
376   public: 376   public:
HITCBC 377   51 when_all_io_launcher( 377   51 when_all_io_launcher(
378   std::tuple<Awaitables...>* awaitables, 378   std::tuple<Awaitables...>* awaitables,
379   state_type* state) 379   state_type* state)
HITCBC 380   51 : awaitables_(awaitables) 380   51 : awaitables_(awaitables)
HITCBC 381   51 , state_(state) 381   51 , state_(state)
382   { 382   {
HITCBC 383   51 } 383   51 }
384   384  
HITCBC 385   51 bool await_ready() const noexcept 385   51 bool await_ready() const noexcept
386   { 386   {
HITCBC 387   51 return sizeof...(Awaitables) == 0; 387   51 return sizeof...(Awaitables) == 0;
388   } 388   }
389   389  
HITCBC 390   51 std::coroutine_handle<> await_suspend( 390   51 std::coroutine_handle<> await_suspend(
391   std::coroutine_handle<> continuation, io_env const* caller_env) 391   std::coroutine_handle<> continuation, io_env const* caller_env)
392   { 392   {
HITCBC 393   51 state_->core_.continuation_.h = continuation; 393   51 state_->core_.continuation_.h = continuation;
HITCBC 394   51 state_->core_.caller_env_ = caller_env; 394   51 state_->core_.caller_env_ = caller_env;
395   395  
HITCBC 396   51 if(caller_env->stop_token.stop_possible()) 396   51 if(caller_env->stop_token.stop_possible())
397   { 397   {
HITCBC 398   4 state_->core_.parent_stop_callback_.emplace( 398   4 state_->core_.parent_stop_callback_.emplace(
HITCBC 399   2 caller_env->stop_token, 399   2 caller_env->stop_token,
HITCBC 400   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_}); 400   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
401   401  
HITCBC 402   2 if(caller_env->stop_token.stop_requested()) 402   2 if(caller_env->stop_token.stop_requested())
HITCBC 403   1 state_->core_.stop_source_.request_stop(); 403   1 state_->core_.stop_source_.request_stop();
404   } 404   }
405   405  
HITCBC 406   51 auto token = state_->core_.stop_source_.get_token(); 406   51 auto token = state_->core_.stop_source_.get_token();
HITCBC 407   48 [&]<std::size_t... Is>(std::index_sequence<Is...>) { 407   48 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 408   51 (..., launch_one<Is>(caller_env->executor, token)); 408   51 (..., launch_one<Is>(caller_env->executor, token));
HITCBC 409   51 }(std::index_sequence_for<Awaitables...>{}); 409   51 }(std::index_sequence_for<Awaitables...>{});
410   410  
HITCBC 411   102 return std::noop_coroutine(); 411   102 return std::noop_coroutine();
HITCBC 412   51 } 412   51 }
413   413  
HITCBC 414   51 void await_resume() const noexcept {} 414   51 void await_resume() const noexcept {}
415   415  
416   private: 416   private:
417   template<std::size_t I> 417   template<std::size_t I>
HITCBC 418   99 void launch_one(executor_ref caller_ex, std::stop_token token) 418   99 void launch_one(executor_ref caller_ex, std::stop_token token)
419   { 419   {
HITCBC 420   99 auto runner = make_when_all_io_runner<I>( 420   99 auto runner = make_when_all_io_runner<I>(
HITCBC 421   99 std::move(std::get<I>(*awaitables_)), state_); 421   99 std::move(std::get<I>(*awaitables_)), state_);
422   422  
HITCBC 423   99 auto h = runner.release(); 423   99 auto h = runner.release();
HITCBC 424   99 h.promise().state_ = state_; 424   99 h.promise().state_ = state_;
HITCBC 425   99 h.promise().env_ = io_env{caller_ex, token, 425   99 h.promise().env_ = io_env{caller_ex, token,
HITCBC 426   99 state_->core_.caller_env_->frame_allocator}; 426   99 state_->core_.caller_env_->frame_allocator};
427   427  
HITCBC 428   99 state_->runner_handles_[I].h = std::coroutine_handle<>{h}; 428   99 state_->runner_handles_[I].h = std::coroutine_handle<>{h};
HITCBC 429   99 state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); 429   99 state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
HITCBC 430   198 } 430   198 }
431   }; 431   };
432   432  
433   /** Helper to extract a single result from state. 433   /** Helper to extract a single result from state.
434   This is a separate function to work around a GCC-11 ICE that occurs 434   This is a separate function to work around a GCC-11 ICE that occurs
435   when using nested immediately-invoked lambdas with pack expansion. 435   when using nested immediately-invoked lambdas with pack expansion.
436   */ 436   */
437   template<std::size_t I, typename... Ts> 437   template<std::size_t I, typename... Ts>
HITCBC 438   71 auto extract_single_result(when_all_state<Ts...>& state) 438   71 auto extract_single_result(when_all_state<Ts...>& state)
439   { 439   {
HITCBC 440   71 return std::move(std::get<I>(state.results_)).get(); 440   71 return std::move(std::get<I>(state.results_)).get();
441   } 441   }
442   442  
443   /** Extract all results from state as a tuple. 443   /** Extract all results from state as a tuple.
444   */ 444   */
445   template<typename... Ts> 445   template<typename... Ts>
HITCBC 446   37 auto extract_results(when_all_state<Ts...>& state) 446   37 auto extract_results(when_all_state<Ts...>& state)
447   { 447   {
HITCBC 448   57 return [&]<std::size_t... Is>(std::index_sequence<Is...>) { 448   57 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 449   37 return std::tuple(extract_single_result<Is>(state)...); 449   37 return std::tuple(extract_single_result<Is>(state)...);
HITCBC 450   74 }(std::index_sequence_for<Ts...>{}); 450   74 }(std::index_sequence_for<Ts...>{});
451   } 451   }
452   452  
453   /** Launches all homogeneous runners concurrently. 453   /** Launches all homogeneous runners concurrently.
454   454  
455   Two-phase approach: create all runners first, then post all. 455   Two-phase approach: create all runners first, then post all.
456   This avoids lifetime issues if a task completes synchronously. 456   This avoids lifetime issues if a task completes synchronously.
457   */ 457   */
458   template<typename Range> 458   template<typename Range>
459   class when_all_homogeneous_launcher 459   class when_all_homogeneous_launcher
460   { 460   {
461   using Awaitable = std::ranges::range_value_t<Range>; 461   using Awaitable = std::ranges::range_value_t<Range>;
462   using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>; 462   using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
463   463  
464   Range* range_; 464   Range* range_;
465   when_all_homogeneous_state<PayloadT>* state_; 465   when_all_homogeneous_state<PayloadT>* state_;
466   466  
467   public: 467   public:
HITCBC 468   15 when_all_homogeneous_launcher( 468   15 when_all_homogeneous_launcher(
469   Range* range, 469   Range* range,
470   when_all_homogeneous_state<PayloadT>* state) 470   when_all_homogeneous_state<PayloadT>* state)
HITCBC 471   15 : range_(range) 471   15 : range_(range)
HITCBC 472   15 , state_(state) 472   15 , state_(state)
473   { 473   {
HITCBC 474   15 } 474   15 }
475   475  
HITCBC 476   15 bool await_ready() const noexcept 476   15 bool await_ready() const noexcept
477   { 477   {
HITCBC 478   15 return std::ranges::empty(*range_); 478   15 return std::ranges::empty(*range_);
479   } 479   }
480   480  
HITCBC 481   15 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) 481   15 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
482   { 482   {
HITCBC 483   15 state_->core_.continuation_.h = continuation; 483   15 state_->core_.continuation_.h = continuation;
HITCBC 484   15 state_->core_.caller_env_ = caller_env; 484   15 state_->core_.caller_env_ = caller_env;
485   485  
HITCBC 486   15 if(caller_env->stop_token.stop_possible()) 486   15 if(caller_env->stop_token.stop_possible())
487   { 487   {
HITCBC 488   4 state_->core_.parent_stop_callback_.emplace( 488   4 state_->core_.parent_stop_callback_.emplace(
HITCBC 489   2 caller_env->stop_token, 489   2 caller_env->stop_token,
HITCBC 490   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_}); 490   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
491   491  
HITCBC 492   2 if(caller_env->stop_token.stop_requested()) 492   2 if(caller_env->stop_token.stop_requested())
HITCBC 493   1 state_->core_.stop_source_.request_stop(); 493   1 state_->core_.stop_source_.request_stop();
494   } 494   }
495   495  
HITCBC 496   15 auto token = state_->core_.stop_source_.get_token(); 496   15 auto token = state_->core_.stop_source_.get_token();
497   497  
498   // Phase 1: Create all runners without dispatching. 498   // Phase 1: Create all runners without dispatching.
HITCBC 499   15 std::size_t index = 0; 499   15 std::size_t index = 0;
HITCBC 500   49 for(auto&& a : *range_) 500   49 for(auto&& a : *range_)
501   { 501   {
HITCBC 502   34 auto runner = make_when_all_homogeneous_runner( 502   34 auto runner = make_when_all_homogeneous_runner(
HITCBC 503   34 std::move(a), state_, index); 503   34 std::move(a), state_, index);
504   504  
HITCBC 505   34 auto h = runner.release(); 505   34 auto h = runner.release();
HITCBC 506   34 h.promise().state_ = state_; 506   34 h.promise().state_ = state_;
HITCBC 507   34 h.promise().index_ = index; 507   34 h.promise().index_ = index;
HITCBC 508   34 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; 508   34 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
509   509  
HITCBC 510   34 state_->runner_handles_[index].h = std::coroutine_handle<>{h}; 510   34 state_->runner_handles_[index].h = std::coroutine_handle<>{h};
HITCBC 511   34 ++index; 511   34 ++index;
512   } 512   }
513   513  
514   // Phase 2: Post all runners. Any may complete synchronously. 514   // Phase 2: Post all runners. Any may complete synchronously.
515   // After last post, state_ and this may be destroyed. 515   // After last post, state_ and this may be destroyed.
HITCBC 516   15 auto* handles = state_->runner_handles_.get(); 516   15 auto* handles = state_->runner_handles_.get();
HITCBC 517   15 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); 517   15 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
HITCBC 518   49 for(std::size_t i = 0; i < count; ++i) 518   49 for(std::size_t i = 0; i < count; ++i)
HITCBC 519   34 caller_env->executor.post(handles[i]); 519   34 caller_env->executor.post(handles[i]);
520   520  
HITCBC 521   30 return std::noop_coroutine(); 521   30 return std::noop_coroutine();
HITCBC 522   49 } 522   49 }
523   523  
HITCBC 524   15 void await_resume() const noexcept 524   15 void await_resume() const noexcept
525   { 525   {
HITCBC 526   15 } 526   15 }
527   }; 527   };
528   528  
529   } // namespace detail 529   } // namespace detail
530   530  
531   /** Execute a range of io_result-returning awaitables concurrently. 531   /** Execute a range of io_result-returning awaitables concurrently.
532   532  
533   Launches all awaitables simultaneously and waits for all to complete. 533   Launches all awaitables simultaneously and waits for all to complete.
534   On success, extracted payloads are collected in a vector preserving 534   On success, extracted payloads are collected in a vector preserving
535   input order. The first error_code cancels siblings and is propagated 535   input order. The first error_code cancels siblings and is propagated
536   in the outer io_result. Exceptions always beat error codes. 536   in the outer io_result. Exceptions always beat error codes.
537   537  
538   @li All child awaitables run concurrently on the caller's executor 538   @li All child awaitables run concurrently on the caller's executor
539   @li Payloads are returned as a vector in input order 539   @li Payloads are returned as a vector in input order
540   @li First error_code wins and cancels siblings 540   @li First error_code wins and cancels siblings
541   @li Exception always beats error_code 541   @li Exception always beats error_code
542   @li Completes only after all children have finished 542   @li Completes only after all children have finished
543   543  
544   @par Thread Safety 544   @par Thread Safety
545   The returned task must be awaited from a single execution context. 545   The returned task must be awaited from a single execution context.
546   Child awaitables execute concurrently but complete through the caller's 546   Child awaitables execute concurrently but complete through the caller's
547   executor. 547   executor.
548   548  
549   @param awaitables Range of io_result-returning awaitables to execute 549   @param awaitables Range of io_result-returning awaitables to execute
550   concurrently (must not be empty). 550   concurrently (must not be empty).
551   551  
552   @return A task yielding io_result<vector<PayloadT>> where PayloadT 552   @return A task yielding io_result<vector<PayloadT>> where PayloadT
553   is the payload extracted from each child's io_result. 553   is the payload extracted from each child's io_result.
554   554  
555   @throws std::invalid_argument if range is empty (thrown before 555   @throws std::invalid_argument if range is empty (thrown before
556   coroutine suspends). 556   coroutine suspends).
557   @throws Rethrows the first child exception after all children 557   @throws Rethrows the first child exception after all children
558   complete (exception beats error_code). 558   complete (exception beats error_code).
559   559  
560   @par Example 560   @par Example
561   @code 561   @code
562   task<void> example() 562   task<void> example()
563   { 563   {
564   std::vector<io_task<size_t>> reads; 564   std::vector<io_task<size_t>> reads;
565   for (auto& buf : buffers) 565   for (auto& buf : buffers)
566   reads.push_back(stream.read_some(buf)); 566   reads.push_back(stream.read_some(buf));
567   567  
568   auto [ec, counts] = co_await when_all(std::move(reads)); 568   auto [ec, counts] = co_await when_all(std::move(reads));
569   if (ec) { // handle error 569   if (ec) { // handle error
570   } 570   }
571   } 571   }
572   @endcode 572   @endcode
573   573  
574   @see IoAwaitableRange, when_all 574   @see IoAwaitableRange, when_all
575   */ 575   */
576   template<IoAwaitableRange R> 576   template<IoAwaitableRange R>
577   requires detail::is_io_result_v< 577   requires detail::is_io_result_v<
578   awaitable_result_t<std::ranges::range_value_t<R>>> 578   awaitable_result_t<std::ranges::range_value_t<R>>>
579   && (!std::is_same_v< 579   && (!std::is_same_v<
580   detail::io_result_payload_t< 580   detail::io_result_payload_t<
581   awaitable_result_t<std::ranges::range_value_t<R>>>, 581   awaitable_result_t<std::ranges::range_value_t<R>>>,
582   std::tuple<>>) 582   std::tuple<>>)
HITCBC 583   13 [[nodiscard]] auto when_all(R&& awaitables) 583   13 [[nodiscard]] auto when_all(R&& awaitables)
584   -> task<io_result<std::vector< 584   -> task<io_result<std::vector<
585   detail::io_result_payload_t< 585   detail::io_result_payload_t<
586   awaitable_result_t<std::ranges::range_value_t<R>>>>>> 586   awaitable_result_t<std::ranges::range_value_t<R>>>>>>
587   { 587   {
588   using Awaitable = std::ranges::range_value_t<R>; 588   using Awaitable = std::ranges::range_value_t<R>;
589   using PayloadT = detail::io_result_payload_t< 589   using PayloadT = detail::io_result_payload_t<
590   awaitable_result_t<Awaitable>>; 590   awaitable_result_t<Awaitable>>;
591   using OwnedRange = std::remove_cvref_t<R>; 591   using OwnedRange = std::remove_cvref_t<R>;
592   592  
593   auto count = std::ranges::size(awaitables); 593   auto count = std::ranges::size(awaitables);
594   if(count == 0) 594   if(count == 0)
595   throw std::invalid_argument("when_all requires at least one awaitable"); 595   throw std::invalid_argument("when_all requires at least one awaitable");
596   596  
597   OwnedRange owned_awaitables = std::forward<R>(awaitables); 597   OwnedRange owned_awaitables = std::forward<R>(awaitables);
598   598  
599   detail::when_all_homogeneous_state<PayloadT> state(count); 599   detail::when_all_homogeneous_state<PayloadT> state(count);
600   600  
601   co_await detail::when_all_homogeneous_launcher<OwnedRange>( 601   co_await detail::when_all_homogeneous_launcher<OwnedRange>(
602   &owned_awaitables, &state); 602   &owned_awaitables, &state);
603   603  
604   if(state.core_.first_exception_) 604   if(state.core_.first_exception_)
605   std::rethrow_exception(state.core_.first_exception_); 605   std::rethrow_exception(state.core_.first_exception_);
606   606  
607   if(state.has_error_.load(std::memory_order_relaxed)) 607   if(state.has_error_.load(std::memory_order_relaxed))
608   co_return io_result<std::vector<PayloadT>>{state.first_error_, {}}; 608   co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
609   609  
610   std::vector<PayloadT> results; 610   std::vector<PayloadT> results;
611   results.reserve(count); 611   results.reserve(count);
612   for(auto& opt : state.results_) 612   for(auto& opt : state.results_)
613   results.push_back(std::move(*opt)); 613   results.push_back(std::move(*opt));
614   614  
615   co_return io_result<std::vector<PayloadT>>{{}, std::move(results)}; 615   co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
HITCBC 616   26 } 616   26 }
617   617  
618   /** Execute a range of void io_result-returning awaitables concurrently. 618   /** Execute a range of void io_result-returning awaitables concurrently.
619   619  
620   Launches all awaitables simultaneously and waits for all to complete. 620   Launches all awaitables simultaneously and waits for all to complete.
621   Since all awaitables return io_result<>, no payload values are 621   Since all awaitables return io_result<>, no payload values are
622   collected. The first error_code cancels siblings and is propagated. 622   collected. The first error_code cancels siblings and is propagated.
623   Exceptions always beat error codes. 623   Exceptions always beat error codes.
624   624  
625   @param awaitables Range of io_result<>-returning awaitables to 625   @param awaitables Range of io_result<>-returning awaitables to
626   execute concurrently (must not be empty). 626   execute concurrently (must not be empty).
627   627  
628   @return A task yielding io_result<> whose ec is the first child 628   @return A task yielding io_result<> whose ec is the first child
629   error, or default-constructed on success. 629   error, or default-constructed on success.
630   630  
631   @throws std::invalid_argument if range is empty. 631   @throws std::invalid_argument if range is empty.
632   @throws Rethrows the first child exception after all children 632   @throws Rethrows the first child exception after all children
633   complete (exception beats error_code). 633   complete (exception beats error_code).
634   634  
635   @par Example 635   @par Example
636   @code 636   @code
637   task<void> example() 637   task<void> example()
638   { 638   {
639   std::vector<io_task<>> jobs; 639   std::vector<io_task<>> jobs;
640   for (int i = 0; i < n; ++i) 640   for (int i = 0; i < n; ++i)
641   jobs.push_back(process(i)); 641   jobs.push_back(process(i));
642   642  
643   auto [ec] = co_await when_all(std::move(jobs)); 643   auto [ec] = co_await when_all(std::move(jobs));
644   } 644   }
645   @endcode 645   @endcode
646   646  
647   @see IoAwaitableRange, when_all 647   @see IoAwaitableRange, when_all
648   */ 648   */
649   template<IoAwaitableRange R> 649   template<IoAwaitableRange R>
650   requires detail::is_io_result_v< 650   requires detail::is_io_result_v<
651   awaitable_result_t<std::ranges::range_value_t<R>>> 651   awaitable_result_t<std::ranges::range_value_t<R>>>
652   && std::is_same_v< 652   && std::is_same_v<
653   detail::io_result_payload_t< 653   detail::io_result_payload_t<
654   awaitable_result_t<std::ranges::range_value_t<R>>>, 654   awaitable_result_t<std::ranges::range_value_t<R>>>,
655   std::tuple<>> 655   std::tuple<>>
HITCBC 656   4 [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>> 656   4 [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
657   { 657   {
658   using OwnedRange = std::remove_cvref_t<R>; 658   using OwnedRange = std::remove_cvref_t<R>;
659   659  
660   auto count = std::ranges::size(awaitables); 660   auto count = std::ranges::size(awaitables);
661   if(count == 0) 661   if(count == 0)
662   throw std::invalid_argument("when_all requires at least one awaitable"); 662   throw std::invalid_argument("when_all requires at least one awaitable");
663   663  
664   OwnedRange owned_awaitables = std::forward<R>(awaitables); 664   OwnedRange owned_awaitables = std::forward<R>(awaitables);
665   665  
666   detail::when_all_homogeneous_state<std::tuple<>> state(count); 666   detail::when_all_homogeneous_state<std::tuple<>> state(count);
667   667  
668   co_await detail::when_all_homogeneous_launcher<OwnedRange>( 668   co_await detail::when_all_homogeneous_launcher<OwnedRange>(
669   &owned_awaitables, &state); 669   &owned_awaitables, &state);
670   670  
671   if(state.core_.first_exception_) 671   if(state.core_.first_exception_)
672   std::rethrow_exception(state.core_.first_exception_); 672   std::rethrow_exception(state.core_.first_exception_);
673   673  
674   if(state.has_error_.load(std::memory_order_relaxed)) 674   if(state.has_error_.load(std::memory_order_relaxed))
675   co_return io_result<>{state.first_error_}; 675   co_return io_result<>{state.first_error_};
676   676  
677   co_return io_result<>{}; 677   co_return io_result<>{};
HITCBC 678   8 } 678   8 }
679   679  
680   /** Execute io_result-returning awaitables concurrently, inspecting error codes. 680   /** Execute io_result-returning awaitables concurrently, inspecting error codes.
681   681  
682   Overload selected when all children return io_result<Ts...>. 682   Overload selected when all children return io_result<Ts...>.
683   The error_code is lifted out of each child into a single outer 683   The error_code is lifted out of each child into a single outer
684   io_result. On success all values are returned; on failure the 684   io_result. On success all values are returned; on failure the
685   first error_code wins. 685   first error_code wins.
686   686  
687   @par Exception Safety 687   @par Exception Safety
688   Exception always beats error_code. If any child throws, the 688   Exception always beats error_code. If any child throws, the
689   exception is rethrown regardless of error_code results. 689   exception is rethrown regardless of error_code results.
690   690  
691   @param awaitables One or more awaitables each returning 691   @param awaitables One or more awaitables each returning
692   io_result<Ts...>. 692   io_result<Ts...>.
693   693  
694   @return A task yielding io_result<R1, R2, ..., Rn> where each Ri 694   @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
695   follows the payload flattening rules. 695   follows the payload flattening rules.
696   */ 696   */
697   template<IoAwaitable... As> 697   template<IoAwaitable... As>
698   requires (sizeof...(As) > 0) 698   requires (sizeof...(As) > 0)
699   && detail::all_io_result_awaitables<As...> 699   && detail::all_io_result_awaitables<As...>
HITCBC 700   51 [[nodiscard]] auto when_all(As... awaitables) 700   51 [[nodiscard]] auto when_all(As... awaitables)
701   -> task<io_result< 701   -> task<io_result<
702   detail::io_result_payload_t<awaitable_result_t<As>>...>> 702   detail::io_result_payload_t<awaitable_result_t<As>>...>>
703   { 703   {
704   using result_type = io_result< 704   using result_type = io_result<
705   detail::io_result_payload_t<awaitable_result_t<As>>...>; 705   detail::io_result_payload_t<awaitable_result_t<As>>...>;
706   706  
707   detail::when_all_state<awaitable_result_t<As>...> state; 707   detail::when_all_state<awaitable_result_t<As>...> state;
708   std::tuple<As...> awaitable_tuple(std::move(awaitables)...); 708   std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
709   709  
710   co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state); 710   co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
711   711  
712   // Exception always wins over error_code 712   // Exception always wins over error_code
713   if(state.core_.first_exception_) 713   if(state.core_.first_exception_)
714   std::rethrow_exception(state.core_.first_exception_); 714   std::rethrow_exception(state.core_.first_exception_);
715   715  
716   auto r = detail::build_when_all_io_result<result_type>( 716   auto r = detail::build_when_all_io_result<result_type>(
717   detail::extract_results(state)); 717   detail::extract_results(state));
718   if(state.has_error_.load(std::memory_order_relaxed)) 718   if(state.has_error_.load(std::memory_order_relaxed))
719   r.ec = state.first_error_; 719   r.ec = state.first_error_;
720   co_return r; 720   co_return r;
HITCBC 721   102 } 721   102 }
722   722  
723   } // namespace capy 723   } // namespace capy
724   } // namespace boost 724   } // namespace boost
725   725  
726   #endif 726   #endif