LCOV - code coverage report
Current view: top level - capy/io - write_now.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 100.0 % 70 70
Test Date: 2026-06-09 22:00:30 Functions: 93.3 % 60 56 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
      11                 : #define BOOST_CAPY_IO_WRITE_NOW_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_slice.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <boost/capy/concept/write_stream.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/io_result.hpp>
      23                 : 
      24                 : #include <cstddef>
      25                 : #include <exception>
      26                 : #include <new>
      27                 : #include <stop_token>
      28                 : #include <utility>
      29                 : 
      30                 : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
      31                 : # if defined(__GNUC__) && !defined(__clang__)
      32                 : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
      33                 : # else
      34                 : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
      35                 : # endif
      36                 : #endif
      37                 : 
      38                 : namespace boost {
      39                 : namespace capy {
      40                 : 
      41                 : /** Eagerly writes complete buffer sequences with frame caching.
      42                 : 
      43                 :     This class wraps a @ref WriteStream and provides an `operator()`
      44                 :     that writes an entire buffer sequence, attempting to complete
      45                 :     synchronously. If every `write_some` completes without suspending,
      46                 :     the entire operation finishes in `await_ready` with no coroutine
      47                 :     suspension.
      48                 : 
      49                 :     The class maintains a one-element coroutine frame cache. After
      50                 :     the first call, subsequent calls reuse the cached frame memory,
      51                 :     avoiding repeated allocation for the internal coroutine.
      52                 : 
      53                 :     @tparam Stream The stream type, must satisfy @ref WriteStream.
      54                 : 
      55                 :     @par Thread Safety
      56                 :     Distinct objects: Safe.
      57                 :     Shared objects: Unsafe.
      58                 : 
      59                 :     @par Preconditions
      60                 :     Only one operation may be outstanding at a time. A new call to
      61                 :     `operator()` must not be made until the previous operation has
      62                 :     completed (i.e., the returned awaitable has been fully consumed).
      63                 : 
      64                 :     @par Example
      65                 : 
      66                 :     @code
      67                 :     template< WriteStream Stream >
      68                 :     task<> send_messages( Stream& stream )
      69                 :     {
      70                 :         write_now wn( stream );
      71                 :         auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
      72                 :         if( ec1 )
      73                 :             detail::throw_system_error( ec1 );
      74                 :         auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
      75                 :         if( ec2 )
      76                 :             detail::throw_system_error( ec2 );
      77                 :     }
      78                 :     @endcode
      79                 : 
      80                 :     @see write, write_some, WriteStream, ConstBufferSequence
      81                 : */
      82                 : template<class Stream>
      83                 :     requires WriteStream<Stream>
      84                 : class write_now
      85                 : {
      86                 :     Stream& stream_;
      87                 :     void* cached_frame_ = nullptr;
      88                 :     std::size_t cached_size_ = 0;
      89                 : 
      90                 :     struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
      91                 :         op_type
      92                 :     {
      93                 :         struct promise_type
      94                 :         {
      95                 :             io_result<std::size_t> result_;
      96                 :             std::exception_ptr ep_;
      97                 :             std::coroutine_handle<> cont_{nullptr};
      98                 :             io_env const* env_ = nullptr;
      99                 :             bool done_ = false;
     100                 : 
     101 HIT          71 :             op_type get_return_object()
     102                 :             {
     103                 :                 return op_type{
     104                 :                     std::coroutine_handle<
     105              71 :                         promise_type>::from_promise(*this)};
     106                 :             }
     107                 : 
     108              71 :             auto initial_suspend() noexcept
     109                 :             {
     110                 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     111              71 :                 return std::suspend_always{};
     112                 : #else
     113                 :                 return std::suspend_never{};
     114                 : #endif
     115                 :             }
     116                 : 
     117              69 :             auto final_suspend() noexcept
     118                 :             {
     119                 :                 struct awaiter
     120                 :                 {
     121                 :                     promise_type* p_;
     122                 : 
     123              69 :                     bool await_ready() const noexcept
     124                 :                     {
     125              69 :                         return false;
     126                 :                     }
     127                 : 
     128              69 :                     std::coroutine_handle<> await_suspend(std::coroutine_handle<>) const noexcept
     129                 :                     {
     130              69 :                         p_->done_ = true;
     131              69 :                         if(!p_->cont_)
     132                 :                             return std::noop_coroutine(); // LCOV_EXCL_LINE cont_ always set on this (suspend_always) path
     133              69 :                         return p_->cont_;
     134                 :                     }
     135                 : 
     136                 :                     void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
     137                 :                 };
     138              69 :                 return awaiter{this};
     139                 :             }
     140                 : 
     141              47 :             void return_value(
     142                 :                 io_result<std::size_t> r) noexcept
     143                 :             {
     144              47 :                 result_ = r;
     145              47 :             }
     146                 : 
     147              22 :             void unhandled_exception()
     148                 :             {
     149              22 :                 ep_ = std::current_exception();
     150              22 :             }
     151                 : 
     152                 :             std::suspend_always yield_value(int) noexcept
     153                 :             {
     154                 :                 return {};
     155                 :             }
     156                 : 
     157                 :             template<class A>
     158              85 :             auto await_transform(A&& a)
     159                 :             {
     160                 :                 using decayed = std::decay_t<A>;
     161                 :                 if constexpr (IoAwaitable<decayed>)
     162                 :                 {
     163                 :                     struct wrapper
     164                 :                     {
     165                 :                         decayed inner_;
     166                 :                         promise_type* p_;
     167                 : 
     168              85 :                         bool await_ready()
     169                 :                         {
     170              85 :                             return inner_.await_ready();
     171                 :                         }
     172                 : 
     173               1 :                         std::coroutine_handle<> await_suspend(std::coroutine_handle<> h)
     174                 :                         {
     175               1 :                             return detail::call_await_suspend(
     176                 :                                 &inner_, h,
     177               1 :                                 p_->env_);
     178                 :                         }
     179                 : 
     180              85 :                         decltype(auto) await_resume()
     181                 :                         {
     182              85 :                             return inner_.await_resume();
     183                 :                         }
     184                 :                     };
     185                 :                     return wrapper{
     186              85 :                         std::forward<A>(a), this};
     187                 :                 }
     188                 :                 else
     189                 :                 {
     190                 :                     return std::forward<A>(a);
     191                 :                 }
     192                 :             }
     193                 : 
     194                 :             static void*
     195              71 :             operator new(
     196                 :                 std::size_t size,
     197                 :                 write_now& self,
     198                 :                 auto&)
     199                 :             {
     200              71 :                 if(self.cached_frame_ &&
     201               5 :                     self.cached_size_ >= size)
     202               4 :                     return self.cached_frame_;
     203              67 :                 void* p = ::operator new(size);
     204              67 :                 if(self.cached_frame_)
     205               1 :                     ::operator delete(self.cached_frame_);
     206              67 :                 self.cached_frame_ = p;
     207              67 :                 self.cached_size_ = size;
     208              67 :                 return p;
     209                 :             }
     210                 : 
     211                 :             static void
     212              71 :             operator delete(void*, std::size_t) noexcept
     213                 :             {
     214              71 :             }
     215                 :         };
     216                 : 
     217                 :         std::coroutine_handle<promise_type> h_;
     218                 : 
     219             140 :         ~op_type()
     220                 :         {
     221             140 :             if(h_)
     222              71 :                 h_.destroy();
     223             140 :         }
     224                 : 
     225                 :         op_type(op_type const&) = delete;
     226                 :         op_type& operator=(op_type const&) = delete;
     227                 : 
     228              69 :         op_type(op_type&& other) noexcept
     229              69 :             : h_(std::exchange(other.h_, nullptr))
     230                 :         {
     231              69 :         }
     232                 : 
     233                 :         op_type& operator=(op_type&&) = delete;
     234                 : 
     235              69 :         bool await_ready() const noexcept
     236                 :         {
     237              69 :             return h_.promise().done_;
     238                 :         }
     239                 : 
     240              69 :         std::coroutine_handle<> await_suspend(
     241                 :             std::coroutine_handle<> cont,
     242                 :             io_env const* env)
     243                 :         {
     244              69 :             auto& p = h_.promise();
     245              69 :             p.cont_ = cont;
     246              69 :             p.env_ = env;
     247              69 :             return h_;
     248                 :         }
     249                 : 
     250              69 :         io_result<std::size_t> await_resume()
     251                 :         {
     252              69 :             auto& p = h_.promise();
     253              69 :             if(p.ep_)
     254              22 :                 std::rethrow_exception(p.ep_);
     255              47 :             return p.result_;
     256                 :         }
     257                 : 
     258                 :     private:
     259              71 :         explicit op_type(
     260                 :             std::coroutine_handle<promise_type> h)
     261              71 :             : h_(h)
     262                 :         {
     263              71 :         }
     264                 :     };
     265                 : 
     266                 : public:
     267                 :     /** Destructor. Frees the cached coroutine frame. */
     268              66 :     ~write_now()
     269                 :     {
     270              66 :         if(cached_frame_)
     271              66 :             ::operator delete(cached_frame_);
     272              66 :     }
     273                 : 
     274                 :     /** Construct from a stream reference.
     275                 : 
     276                 :         @param s The stream to write to. Must outlive this object.
     277                 :     */
     278                 :     explicit
     279              66 :     write_now(Stream& s) noexcept
     280              66 :         : stream_(s)
     281                 :     {
     282              66 :     }
     283                 : 
     284                 :     write_now(write_now const&) = delete;
     285                 :     write_now& operator=(write_now const&) = delete;
     286                 : 
     287                 :     /** Eagerly write the entire buffer sequence.
     288                 : 
     289                 :         Writes data to the stream by calling `write_some` repeatedly
     290                 :         until the entire buffer sequence is written or an error
     291                 :         occurs. The operation attempts to complete synchronously:
     292                 :         if every `write_some` completes without suspending, the
     293                 :         entire operation finishes in `await_ready`.
     294                 : 
     295                 :         When the fast path cannot complete, the coroutine suspends
     296                 :         and continues asynchronously. The internal coroutine frame
     297                 :         is cached and reused across calls.
     298                 : 
     299                 :         @param buffers The buffer sequence to write. Passed by
     300                 :             value to ensure the sequence lives in the coroutine
     301                 :             frame across suspension points.
     302                 : 
     303                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     304                 :             On success, `n` equals `buffer_size(buffers)`. On
     305                 :             error, `n` is the number of bytes written before the
     306                 :             error. Compare error codes to conditions:
     307                 :             @li `cond::canceled` - Operation was cancelled
     308                 :             @li `std::errc::broken_pipe` - Peer closed connection
     309                 : 
     310                 :         @par Example
     311                 : 
     312                 :         @code
     313                 :         write_now wn( stream );
     314                 :         auto [ec, n] = co_await wn( make_buffer( body ) );
     315                 :         if( ec )
     316                 :             detail::throw_system_error( ec );
     317                 :         @endcode
     318                 : 
     319                 :         @see write, write_some, WriteStream
     320                 :     */
     321                 : // GCC falsely warns that the coroutine promise's
     322                 : // placement operator new(size_t, write_now&, auto&)
     323                 : // mismatches operator delete(void*, size_t). Per the
     324                 : // standard, coroutine deallocation lookup is separate.
     325                 : #if defined(__GNUC__) && !defined(__clang__)
     326                 : #pragma GCC diagnostic push
     327                 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     328                 : #endif
     329                 : 
     330                 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     331                 :     template<ConstBufferSequence Buffers>
     332                 :     op_type
     333              71 :     operator()(Buffers buffers)
     334                 :     {
     335                 :         std::size_t const total_size = buffer_size(buffers);
     336                 :         std::size_t total_written = 0;
     337                 :         auto cb = buffer_slice(buffers);
     338                 :         while(total_written < total_size)
     339                 :         {
     340                 :             auto r =
     341                 :                 co_await stream_.write_some(cb.data());
     342                 :             cb.remove_prefix(std::get<0>(r.values));
     343                 :             total_written += std::get<0>(r.values);
     344                 :             if(r.ec)
     345                 :                 co_return io_result<std::size_t>{
     346                 :                     r.ec, total_written};
     347                 :         }
     348                 :         co_return io_result<std::size_t>{
     349                 :             {}, total_written};
     350             142 :     }
     351                 : #else
     352                 :     template<ConstBufferSequence Buffers>
     353                 :     op_type
     354                 :     operator()(Buffers buffers)
     355                 :     {
     356                 :         std::size_t const total_size = buffer_size(buffers);
     357                 :         std::size_t total_written = 0;
     358                 : 
     359                 :         // GCC ICE in expand_expr_real_1 (expr.cc:11376)
     360                 :         // when the buffer slice spans a co_yield, so
     361                 :         // the GCC path uses a separate simple coroutine.
     362                 :         auto cb = buffer_slice(buffers);
     363                 :         while(total_written < total_size)
     364                 :         {
     365                 :             auto inner = stream_.write_some(cb.data());
     366                 :             if(!inner.await_ready())
     367                 :                 break;
     368                 :             auto r = inner.await_resume();
     369                 :             if(r.ec)
     370                 :                 co_return io_result<std::size_t>{
     371                 :                     r.ec, total_written};
     372                 :             cb.remove_prefix(std::get<0>(r.values));
     373                 :             total_written += std::get<0>(r.values);
     374                 :         }
     375                 : 
     376                 :         if(total_written >= total_size)
     377                 :             co_return io_result<std::size_t>{
     378                 :                 {}, total_written};
     379                 : 
     380                 :         co_yield 0;
     381                 : 
     382                 :         while(total_written < total_size)
     383                 :         {
     384                 :             auto r =
     385                 :                 co_await stream_.write_some(cb.data());
     386                 :             cb.remove_prefix(std::get<0>(r.values));
     387                 :             total_written += std::get<0>(r.values);
     388                 :             if(r.ec)
     389                 :                 co_return io_result<std::size_t>{
     390                 :                     r.ec, total_written};
     391                 :         }
     392                 :         co_return io_result<std::size_t>{
     393                 :             {}, total_written};
     394                 :     }
     395                 : #endif
     396                 : 
     397                 : #if defined(__GNUC__) && !defined(__clang__)
     398                 : #pragma GCC diagnostic pop
     399                 : #endif
     400                 : };
     401                 : 
     402                 : template<WriteStream S>
     403                 : write_now(S&) -> write_now<S>;
     404                 : 
     405                 : } // namespace capy
     406                 : } // namespace boost
     407                 : 
     408                 : #endif
        

Generated by: LCOV version 2.3