include/boost/capy/io/write_now.hpp

100.0% Lines (57/57) 97.7% List of functions (43/44)
write_now.hpp
f(x) Functions (44)
Function Calls Lines Blocks
boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::get_return_object() :101 3x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::get_return_object() :101 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::initial_suspend() :108 3x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::initial_suspend() :108 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::final_suspend() :117 1x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::final_suspend() :117 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::return_value(boost::capy::io_result<unsigned long>) :141 1x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::return_value(boost::capy::io_result<unsigned long>) :141 46x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::unhandled_exception() :147 0 0.0% 0.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::unhandled_exception() :147 22x 100.0% 100.0% auto boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::await_transform<boost::capy::(anonymous namespace)::suspending_write_awaitable>(boost::capy::(anonymous namespace)::suspending_write_awaitable&&) :158 1x 100.0% 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<boost::capy::const_buffer>::data_view>(boost::capy::detail::slice_impl<boost::capy::const_buffer>::data_view)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<boost::capy::const_buffer>::data_view>(boost::capy::detail::slice_impl<boost::capy::const_buffer>::data_view)::awaitable&&) :158 60x 100.0% 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<boost::capy::mutable_buffer>::data_view>(boost::capy::detail::slice_impl<boost::capy::mutable_buffer>::data_view)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<boost::capy::mutable_buffer>::data_view>(boost::capy::detail::slice_impl<boost::capy::mutable_buffer>::data_view)::awaitable&&) :158 6x 100.0% 100.0% auto boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::await_transform<boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<std::array<boost::capy::const_buffer, 2ul> >::data_view>(boost::capy::detail::slice_impl<std::array<boost::capy::const_buffer, 2ul> >::data_view)::awaitable>(boost::capy::test::write_stream::write_some<boost::capy::detail::slice_impl<std::array<boost::capy::const_buffer, 2ul> >::data_view>(boost::capy::detail::slice_impl<std::array<boost::capy::const_buffer, 2ul> >::data_view)::awaitable&&) :158 18x 100.0% 100.0% void* boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>&, boost::capy::const_buffer&) :195 1x 70.0% 62.0% void* boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 1ul> >(unsigned long, boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>&, std::array<boost::capy::const_buffer, 1ul>&) :195 1x 70.0% 62.0% void* boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 256ul> >(unsigned long, boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>&, std::array<boost::capy::const_buffer, 256ul>&) :195 1x 90.0% 88.0% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::const_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::const_buffer&) :195 44x 90.0% 88.0% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<boost::capy::mutable_buffer>(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, boost::capy::mutable_buffer&) :195 6x 70.0% 62.0% void* boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator new<std::array<boost::capy::const_buffer, 2ul> >(unsigned long, boost::capy::write_now<boost::capy::test::write_stream>&, std::array<boost::capy::const_buffer, 2ul>&) :195 18x 70.0% 62.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type::operator delete(void*, unsigned long) :212 3x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type::operator delete(void*, unsigned long) :212 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::~op_type() :219 4x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::~op_type() :219 136x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::op_type(boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type&&) :228 1x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::op_type(boost::capy::write_now<boost::capy::test::write_stream>::op_type&&) :228 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::await_ready() const :235 1x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_ready() const :235 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :240 1x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :240 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::await_resume() :250 1x 80.0% 60.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::await_resume() :250 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::op_type(std::__n4861::coroutine_handle<boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type::promise_type>) :259 3x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type::op_type(std::__n4861::coroutine_handle<boost::capy::write_now<boost::capy::test::write_stream>::op_type::promise_type>) :259 68x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::~write_now() :268 2x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::~write_now() :268 64x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::write_now(boost::capy::(anonymous namespace)::suspending_write_stream&) :279 2x 100.0% 100.0% boost::capy::write_now<boost::capy::test::write_stream>::write_now(boost::capy::test::write_stream&) :279 64x 100.0% 100.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::operator()<boost::capy::const_buffer>(boost::capy::const_buffer) :333 1x 100.0% 44.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::operator()<std::array<boost::capy::const_buffer, 1ul> >(std::array<boost::capy::const_buffer, 1ul>) :333 1x 100.0% 44.0% boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::op_type boost::capy::write_now<boost::capy::(anonymous namespace)::suspending_write_stream>::operator()<std::array<boost::capy::const_buffer, 256ul> >(std::array<boost::capy::const_buffer, 256ul>) :333 1x 100.0% 44.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::const_buffer>(boost::capy::const_buffer) :333 44x 100.0% 44.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<boost::capy::mutable_buffer>(boost::capy::mutable_buffer) :333 6x 100.0% 44.0% boost::capy::write_now<boost::capy::test::write_stream>::op_type boost::capy::write_now<boost::capy::test::write_stream>::operator()<std::array<boost::capy::const_buffer, 2ul> >(std::array<boost::capy::const_buffer, 2ul>) :333 18x 100.0% 44.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 #define BOOST_CAPY_IO_WRITE_NOW_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_slice.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23
24 #include <cstddef>
25 #include <exception>
26 #include <new>
27 #include <stop_token>
28 #include <utility>
29
30 #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
31 # if defined(__GNUC__) && !defined(__clang__)
32 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
33 # else
34 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
35 # endif
36 #endif
37
38 namespace boost {
39 namespace capy {
40
41 /** Eagerly writes complete buffer sequences with frame caching.
42
43 This class wraps a @ref WriteStream and provides an `operator()`
44 that writes an entire buffer sequence, attempting to complete
45 synchronously. If every `write_some` completes without suspending,
46 the entire operation finishes in `await_ready` with no coroutine
47 suspension.
48
49 The class maintains a one-element coroutine frame cache. After
50 the first call, subsequent calls reuse the cached frame memory,
51 avoiding repeated allocation for the internal coroutine.
52
53 @tparam Stream The stream type, must satisfy @ref WriteStream.
54
55 @par Thread Safety
56 Distinct objects: Safe.
57 Shared objects: Unsafe.
58
59 @par Preconditions
60 Only one operation may be outstanding at a time. A new call to
61 `operator()` must not be made until the previous operation has
62 completed (i.e., the returned awaitable has been fully consumed).
63
64 @par Example
65
66 @code
67 template< WriteStream Stream >
68 task<> send_messages( Stream& stream )
69 {
70 write_now wn( stream );
71 auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
72 if( ec1 )
73 detail::throw_system_error( ec1 );
74 auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
75 if( ec2 )
76 detail::throw_system_error( ec2 );
77 }
78 @endcode
79
80 @see write, write_some, WriteStream, ConstBufferSequence
81 */
82 template<class Stream>
83 requires WriteStream<Stream>
84 class write_now
85 {
86 Stream& stream_;
87 void* cached_frame_ = nullptr;
88 std::size_t cached_size_ = 0;
89
90 struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
91 op_type
92 {
93 struct promise_type
94 {
95 io_result<std::size_t> result_;
96 std::exception_ptr ep_;
97 std::coroutine_handle<> cont_{nullptr};
98 io_env const* env_ = nullptr;
99 bool done_ = false;
100
101 71x op_type get_return_object()
102 {
103 return op_type{
104 std::coroutine_handle<
105 71x promise_type>::from_promise(*this)};
106 }
107
108 71x auto initial_suspend() noexcept
109 {
110 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 71x return std::suspend_always{};
112 #else
113 return std::suspend_never{};
114 #endif
115 }
116
117 69x auto final_suspend() noexcept
118 {
119 struct awaiter
120 {
121 promise_type* p_;
122
123 bool await_ready() const noexcept
124 {
125 return false;
126 }
127
128 std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
129 {
130 p_->done_ = true;
131 if(!p_->cont_)
132 return std::noop_coroutine(); // LCOV_EXCL_LINE cont_ always set on this (suspend_always) path
133 return p_->cont_;
134 }
135
136 void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
137 };
138 69x return awaiter{this};
139 }
140
141 47x void return_value(
142 io_result<std::size_t> r) noexcept
143 {
144 47x result_ = r;
145 47x }
146
147 22x void unhandled_exception()
148 {
149 22x ep_ = std::current_exception();
150 22x }
151
152 std::suspend_always yield_value(int) noexcept
153 {
154 return {};
155 }
156
157 template<class A>
158 85x auto await_transform(A&& a)
159 {
160 using decayed = std::decay_t<A>;
161 if constexpr (IoAwaitable<decayed>)
162 {
163 struct wrapper
164 {
165 decayed inner_;
166 promise_type* p_;
167
168 bool await_ready()
169 {
170 return inner_.await_ready();
171 }
172
173 std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
174 {
175 return detail::call_await_suspend(
176 &inner_, h,
177 p_->env_);
178 }
179
180 decltype(auto) await_resume()
181 {
182 return inner_.await_resume();
183 }
184 };
185 return wrapper{
186 85x std::forward<A>(a), this};
187 }
188 else
189 {
190 return std::forward<A>(a);
191 }
192 }
193
194 static void*
195 71x operator new(
196 std::size_t size,
197 write_now& self,
198 auto&)
199 {
200 71x if(self.cached_frame_ &&
201 5x self.cached_size_ >= size)
202 4x return self.cached_frame_;
203 67x void* p = ::operator new(size);
204 67x if(self.cached_frame_)
205 1x ::operator delete(self.cached_frame_);
206 67x self.cached_frame_ = p;
207 67x self.cached_size_ = size;
208 67x return p;
209 }
210
211 static void
212 71x operator delete(void*, std::size_t) noexcept
213 {
214 71x }
215 };
216
217 std::coroutine_handle<promise_type> h_;
218
219 140x ~op_type()
220 {
221 140x if(h_)
222 71x h_.destroy();
223 140x }
224
225 op_type(op_type const&) = delete;
226 op_type& operator=(op_type const&) = delete;
227
228 69x op_type(op_type&& other) noexcept
229 69x : h_(std::exchange(other.h_, nullptr))
230 {
231 69x }
232
233 op_type& operator=(op_type&&) = delete;
234
235 69x bool await_ready() const noexcept
236 {
237 69x return h_.promise().done_;
238 }
239
240 69x std::coroutine_handle<> await_suspend(
241 std::coroutine_handle<> cont,
242 io_env const* env)
243 {
244 69x auto& p = h_.promise();
245 69x p.cont_ = cont;
246 69x p.env_ = env;
247 69x return h_;
248 }
249
250 69x io_result<std::size_t> await_resume()
251 {
252 69x auto& p = h_.promise();
253 69x if(p.ep_)
254 22x std::rethrow_exception(p.ep_);
255 47x return p.result_;
256 }
257
258 private:
259 71x explicit op_type(
260 std::coroutine_handle<promise_type> h)
261 71x : h_(h)
262 {
263 71x }
264 };
265
266 public:
267 /** Destructor. Frees the cached coroutine frame. */
268 66x ~write_now()
269 {
270 66x if(cached_frame_)
271 66x ::operator delete(cached_frame_);
272 66x }
273
274 /** Construct from a stream reference.
275
276 @param s The stream to write to. Must outlive this object.
277 */
278 explicit
279 66x write_now(Stream& s) noexcept
280 66x : stream_(s)
281 {
282 66x }
283
284 write_now(write_now const&) = delete;
285 write_now& operator=(write_now const&) = delete;
286
287 /** Eagerly write the entire buffer sequence.
288
289 Writes data to the stream by calling `write_some` repeatedly
290 until the entire buffer sequence is written or an error
291 occurs. The operation attempts to complete synchronously:
292 if every `write_some` completes without suspending, the
293 entire operation finishes in `await_ready`.
294
295 When the fast path cannot complete, the coroutine suspends
296 and continues asynchronously. The internal coroutine frame
297 is cached and reused across calls.
298
299 @param buffers The buffer sequence to write. Passed by
300 value to ensure the sequence lives in the coroutine
301 frame across suspension points.
302
303 @return An awaitable that await-returns `(error_code,std::size_t)`.
304 On success, `n` equals `buffer_size(buffers)`. On
305 error, `n` is the number of bytes written before the
306 error. Compare error codes to conditions:
307 @li `cond::canceled` - Operation was cancelled
308 @li `std::errc::broken_pipe` - Peer closed connection
309
310 @par Example
311
312 @code
313 write_now wn( stream );
314 auto [ec, n] = co_await wn( make_buffer( body ) );
315 if( ec )
316 detail::throw_system_error( ec );
317 @endcode
318
319 @see write, write_some, WriteStream
320 */
321 // GCC falsely warns that the coroutine promise's
322 // placement operator new(size_t, write_now&, auto&)
323 // mismatches operator delete(void*, size_t). Per the
324 // standard, coroutine deallocation lookup is separate.
325 #if defined(__GNUC__) && !defined(__clang__)
326 #pragma GCC diagnostic push
327 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
328 #endif
329
330 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
331 template<ConstBufferSequence Buffers>
332 op_type
333 71x operator()(Buffers buffers)
334 {
335 std::size_t const total_size = buffer_size(buffers);
336 std::size_t total_written = 0;
337 auto cb = buffer_slice(buffers);
338 while(total_written < total_size)
339 {
340 auto r =
341 co_await stream_.write_some(cb.data());
342 cb.remove_prefix(std::get<0>(r.values));
343 total_written += std::get<0>(r.values);
344 if(r.ec)
345 co_return io_result<std::size_t>{
346 r.ec, total_written};
347 }
348 co_return io_result<std::size_t>{
349 {}, total_written};
350 142x }
351 #else
352 template<ConstBufferSequence Buffers>
353 op_type
354 operator()(Buffers buffers)
355 {
356 std::size_t const total_size = buffer_size(buffers);
357 std::size_t total_written = 0;
358
359 // GCC ICE in expand_expr_real_1 (expr.cc:11376)
360 // when the buffer slice spans a co_yield, so
361 // the GCC path uses a separate simple coroutine.
362 auto cb = buffer_slice(buffers);
363 while(total_written < total_size)
364 {
365 auto inner = stream_.write_some(cb.data());
366 if(!inner.await_ready())
367 break;
368 auto r = inner.await_resume();
369 if(r.ec)
370 co_return io_result<std::size_t>{
371 r.ec, total_written};
372 cb.remove_prefix(std::get<0>(r.values));
373 total_written += std::get<0>(r.values);
374 }
375
376 if(total_written >= total_size)
377 co_return io_result<std::size_t>{
378 {}, total_written};
379
380 co_yield 0;
381
382 while(total_written < total_size)
383 {
384 auto r =
385 co_await stream_.write_some(cb.data());
386 cb.remove_prefix(std::get<0>(r.values));
387 total_written += std::get<0>(r.values);
388 if(r.ec)
389 co_return io_result<std::size_t>{
390 r.ec, total_written};
391 }
392 co_return io_result<std::size_t>{
393 {}, total_written};
394 }
395 #endif
396
397 #if defined(__GNUC__) && !defined(__clang__)
398 #pragma GCC diagnostic pop
399 #endif
400 };
401
402 template<WriteStream S>
403 write_now(S&) -> write_now<S>;
404
405 } // namespace capy
406 } // namespace boost
407
408 #endif
409