TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 : #define BOOST_CAPY_IO_WRITE_NOW_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_slice.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 :
24 : #include <cstddef>
25 : #include <exception>
26 : #include <new>
27 : #include <stop_token>
28 : #include <utility>
29 :
30 : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
31 : # if defined(__GNUC__) && !defined(__clang__)
32 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
33 : # else
34 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
35 : # endif
36 : #endif
37 :
38 : namespace boost {
39 : namespace capy {
40 :
41 : /** Eagerly writes complete buffer sequences with frame caching.
42 :
43 : This class wraps a @ref WriteStream and provides an `operator()`
44 : that writes an entire buffer sequence, attempting to complete
45 : synchronously. If every `write_some` completes without suspending,
46 : the entire operation finishes in `await_ready` with no coroutine
47 : suspension.
48 :
49 : The class maintains a one-element coroutine frame cache. After
50 : the first call, subsequent calls reuse the cached frame memory,
51 : avoiding repeated allocation for the internal coroutine.
52 :
53 : @tparam Stream The stream type, must satisfy @ref WriteStream.
54 :
55 : @par Thread Safety
56 : Distinct objects: Safe.
57 : Shared objects: Unsafe.
58 :
59 : @par Preconditions
60 : Only one operation may be outstanding at a time. A new call to
61 : `operator()` must not be made until the previous operation has
62 : completed (i.e., the returned awaitable has been fully consumed).
63 :
64 : @par Example
65 :
66 : @code
67 : template< WriteStream Stream >
68 : task<> send_messages( Stream& stream )
69 : {
70 : write_now wn( stream );
71 : auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
72 : if( ec1 )
73 : detail::throw_system_error( ec1 );
74 : auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
75 : if( ec2 )
76 : detail::throw_system_error( ec2 );
77 : }
78 : @endcode
79 :
80 : @see write, write_some, WriteStream, ConstBufferSequence
81 : */
82 : template<class Stream>
83 : requires WriteStream<Stream>
84 : class write_now
85 : {
86 : Stream& stream_;
87 : void* cached_frame_ = nullptr;
88 : std::size_t cached_size_ = 0;
89 :
90 : struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
91 : op_type
92 : {
93 : struct promise_type
94 : {
95 : io_result<std::size_t> result_;
96 : std::exception_ptr ep_;
97 : std::coroutine_handle<> cont_{nullptr};
98 : io_env const* env_ = nullptr;
99 : bool done_ = false;
100 :
101 HIT 71 : op_type get_return_object()
102 : {
103 : return op_type{
104 : std::coroutine_handle<
105 71 : promise_type>::from_promise(*this)};
106 : }
107 :
108 71 : auto initial_suspend() noexcept
109 : {
110 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 71 : return std::suspend_always{};
112 : #else
113 : return std::suspend_never{};
114 : #endif
115 : }
116 :
117 69 : auto final_suspend() noexcept
118 : {
119 : struct awaiter
120 : {
121 : promise_type* p_;
122 :
123 69 : bool await_ready() const noexcept
124 : {
125 69 : return false;
126 : }
127 :
128 69 : std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
129 : {
130 69 : p_->done_ = true;
131 69 : if(!p_->cont_)
132 : return std::noop_coroutine(); // LCOV_EXCL_LINE cont_ always set on this (suspend_always) path
133 69 : return p_->cont_;
134 : }
135 :
136 : void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
137 : };
138 69 : return awaiter{this};
139 : }
140 :
141 47 : void return_value(
142 : io_result<std::size_t> r) noexcept
143 : {
144 47 : result_ = r;
145 47 : }
146 :
147 22 : void unhandled_exception()
148 : {
149 22 : ep_ = std::current_exception();
150 22 : }
151 :
152 : std::suspend_always yield_value(int) noexcept
153 : {
154 : return {};
155 : }
156 :
157 : template<class A>
158 85 : auto await_transform(A&& a)
159 : {
160 : using decayed = std::decay_t<A>;
161 : if constexpr (IoAwaitable<decayed>)
162 : {
163 : struct wrapper
164 : {
165 : decayed inner_;
166 : promise_type* p_;
167 :
168 85 : bool await_ready()
169 : {
170 85 : return inner_.await_ready();
171 : }
172 :
173 1 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
174 : {
175 1 : return detail::call_await_suspend(
176 : &inner_, h,
177 1 : p_->env_);
178 : }
179 :
180 85 : decltype(auto) await_resume()
181 : {
182 85 : return inner_.await_resume();
183 : }
184 : };
185 : return wrapper{
186 85 : std::forward<A>(a), this};
187 : }
188 : else
189 : {
190 : return std::forward<A>(a);
191 : }
192 : }
193 :
194 : static void*
195 71 : operator new(
196 : std::size_t size,
197 : write_now& self,
198 : auto&)
199 : {
200 71 : if(self.cached_frame_ &&
201 5 : self.cached_size_ >= size)
202 4 : return self.cached_frame_;
203 67 : void* p = ::operator new(size);
204 67 : if(self.cached_frame_)
205 1 : ::operator delete(self.cached_frame_);
206 67 : self.cached_frame_ = p;
207 67 : self.cached_size_ = size;
208 67 : return p;
209 : }
210 :
211 : static void
212 71 : operator delete(void*, std::size_t) noexcept
213 : {
214 71 : }
215 : };
216 :
217 : std::coroutine_handle<promise_type> h_;
218 :
219 140 : ~op_type()
220 : {
221 140 : if(h_)
222 71 : h_.destroy();
223 140 : }
224 :
225 : op_type(op_type const&) = delete;
226 : op_type& operator=(op_type const&) = delete;
227 :
228 69 : op_type(op_type&& other) noexcept
229 69 : : h_(std::exchange(other.h_, nullptr))
230 : {
231 69 : }
232 :
233 : op_type& operator=(op_type&&) = delete;
234 :
235 69 : bool await_ready() const noexcept
236 : {
237 69 : return h_.promise().done_;
238 : }
239 :
240 69 : std::coroutine_handle<> await_suspend(
241 : std::coroutine_handle<> cont,
242 : io_env const* env)
243 : {
244 69 : auto& p = h_.promise();
245 69 : p.cont_ = cont;
246 69 : p.env_ = env;
247 69 : return h_;
248 : }
249 :
250 69 : io_result<std::size_t> await_resume()
251 : {
252 69 : auto& p = h_.promise();
253 69 : if(p.ep_)
254 22 : std::rethrow_exception(p.ep_);
255 47 : return p.result_;
256 : }
257 :
258 : private:
259 71 : explicit op_type(
260 : std::coroutine_handle<promise_type> h)
261 71 : : h_(h)
262 : {
263 71 : }
264 : };
265 :
266 : public:
267 : /** Destructor. Frees the cached coroutine frame. */
268 66 : ~write_now()
269 : {
270 66 : if(cached_frame_)
271 66 : ::operator delete(cached_frame_);
272 66 : }
273 :
274 : /** Construct from a stream reference.
275 :
276 : @param s The stream to write to. Must outlive this object.
277 : */
278 : explicit
279 66 : write_now(Stream& s) noexcept
280 66 : : stream_(s)
281 : {
282 66 : }
283 :
284 : write_now(write_now const&) = delete;
285 : write_now& operator=(write_now const&) = delete;
286 :
287 : /** Eagerly write the entire buffer sequence.
288 :
289 : Writes data to the stream by calling `write_some` repeatedly
290 : until the entire buffer sequence is written or an error
291 : occurs. The operation attempts to complete synchronously:
292 : if every `write_some` completes without suspending, the
293 : entire operation finishes in `await_ready`.
294 :
295 : When the fast path cannot complete, the coroutine suspends
296 : and continues asynchronously. The internal coroutine frame
297 : is cached and reused across calls.
298 :
299 : @param buffers The buffer sequence to write. Passed by
300 : value to ensure the sequence lives in the coroutine
301 : frame across suspension points.
302 :
303 : @return An awaitable that await-returns `(error_code,std::size_t)`.
304 : On success, `n` equals `buffer_size(buffers)`. On
305 : error, `n` is the number of bytes written before the
306 : error. Compare error codes to conditions:
307 : @li `cond::canceled` - Operation was cancelled
308 : @li `std::errc::broken_pipe` - Peer closed connection
309 :
310 : @par Example
311 :
312 : @code
313 : write_now wn( stream );
314 : auto [ec, n] = co_await wn( make_buffer( body ) );
315 : if( ec )
316 : detail::throw_system_error( ec );
317 : @endcode
318 :
319 : @see write, write_some, WriteStream
320 : */
321 : // GCC falsely warns that the coroutine promise's
322 : // placement operator new(size_t, write_now&, auto&)
323 : // mismatches operator delete(void*, size_t). Per the
324 : // standard, coroutine deallocation lookup is separate.
325 : #if defined(__GNUC__) && !defined(__clang__)
326 : #pragma GCC diagnostic push
327 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
328 : #endif
329 :
330 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
331 : template<ConstBufferSequence Buffers>
332 : op_type
333 71 : operator()(Buffers buffers)
334 : {
335 : std::size_t const total_size = buffer_size(buffers);
336 : std::size_t total_written = 0;
337 : auto cb = buffer_slice(buffers);
338 : while(total_written < total_size)
339 : {
340 : auto r =
341 : co_await stream_.write_some(cb.data());
342 : cb.remove_prefix(std::get<0>(r.values));
343 : total_written += std::get<0>(r.values);
344 : if(r.ec)
345 : co_return io_result<std::size_t>{
346 : r.ec, total_written};
347 : }
348 : co_return io_result<std::size_t>{
349 : {}, total_written};
350 142 : }
351 : #else
352 : template<ConstBufferSequence Buffers>
353 : op_type
354 : operator()(Buffers buffers)
355 : {
356 : std::size_t const total_size = buffer_size(buffers);
357 : std::size_t total_written = 0;
358 :
359 : // GCC ICE in expand_expr_real_1 (expr.cc:11376)
360 : // when the buffer slice spans a co_yield, so
361 : // the GCC path uses a separate simple coroutine.
362 : auto cb = buffer_slice(buffers);
363 : while(total_written < total_size)
364 : {
365 : auto inner = stream_.write_some(cb.data());
366 : if(!inner.await_ready())
367 : break;
368 : auto r = inner.await_resume();
369 : if(r.ec)
370 : co_return io_result<std::size_t>{
371 : r.ec, total_written};
372 : cb.remove_prefix(std::get<0>(r.values));
373 : total_written += std::get<0>(r.values);
374 : }
375 :
376 : if(total_written >= total_size)
377 : co_return io_result<std::size_t>{
378 : {}, total_written};
379 :
380 : co_yield 0;
381 :
382 : while(total_written < total_size)
383 : {
384 : auto r =
385 : co_await stream_.write_some(cb.data());
386 : cb.remove_prefix(std::get<0>(r.values));
387 : total_written += std::get<0>(r.values);
388 : if(r.ec)
389 : co_return io_result<std::size_t>{
390 : r.ec, total_written};
391 : }
392 : co_return io_result<std::size_t>{
393 : {}, total_written};
394 : }
395 : #endif
396 :
397 : #if defined(__GNUC__) && !defined(__clang__)
398 : #pragma GCC diagnostic pop
399 : #endif
400 : };
401 :
402 : template<WriteStream S>
403 : write_now(S&) -> write_now<S>;
404 :
405 : } // namespace capy
406 : } // namespace boost
407 :
408 : #endif
|