LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_stream_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 85.1 % 208 177 31
Test Date: 2026-04-16 21:41:12 Functions: 75.0 % 88 66 22

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/shutdown_type.hpp>
      15                 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
      16                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      17                 : #include <boost/capy/buffers.hpp>
      18                 : 
      19                 : #include <coroutine>
      20                 : 
      21                 : #include <errno.h>
      22                 : #include <sys/socket.h>
      23                 : #include <sys/uio.h>
      24                 : 
      25                 : namespace boost::corosio::detail {
      26                 : 
      27                 : /** CRTP base for reactor-backed stream socket implementations.
      28                 : 
      29                 :     Inherits shared data members and cancel/close/register logic
      30                 :     from reactor_basic_socket. Adds the stream-specific remote
      31                 :     endpoint, shutdown, and I/O dispatch (connect, read, write).
      32                 : 
      33                 :     @tparam Derived   The concrete socket type (CRTP).
      34                 :     @tparam Service   The backend's socket service type.
      35                 :     @tparam ConnOp    The backend's connect op type.
      36                 :     @tparam ReadOp    The backend's read op type.
      37                 :     @tparam WriteOp   The backend's write op type.
      38                 :     @tparam DescState The backend's descriptor_state type.
      39                 :     @tparam ImplBase  The public vtable base
      40                 :                       (tcp_socket::implementation or
      41                 :                        local_stream_socket::implementation).
      42                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      43                 : */
      44                 : template<
      45                 :     class Derived,
      46                 :     class Service,
      47                 :     class ConnOp,
      48                 :     class ReadOp,
      49                 :     class WriteOp,
      50                 :     class DescState,
      51                 :     class ImplBase = tcp_socket::implementation,
      52                 :     class Endpoint = endpoint>
      53                 : class reactor_stream_socket
      54                 :     : public reactor_basic_socket<
      55                 :           Derived,
      56                 :           ImplBase,
      57                 :           Service,
      58                 :           DescState,
      59                 :           Endpoint>
      60                 : {
      61                 :     using base_type = reactor_basic_socket<
      62                 :         Derived,
      63                 :         ImplBase,
      64                 :         Service,
      65                 :         DescState,
      66                 :         Endpoint>;
      67                 :     friend base_type;
      68                 :     friend Derived;
      69                 : 
      70                 : protected:
      71                 :     // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
      72 HIT       22189 :     explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
      73                 : 
      74                 : protected:
      75                 :     Endpoint remote_endpoint_;
      76                 : 
      77                 : public:
      78                 :     /// Pending connect operation slot.
      79                 :     ConnOp conn_;
      80                 : 
      81                 :     /// Pending read operation slot.
      82                 :     ReadOp rd_;
      83                 : 
      84                 :     /// Pending write operation slot.
      85                 :     WriteOp wr_;
      86                 : 
      87           22189 :     ~reactor_stream_socket() override = default;
      88                 : 
      89                 :     /// Return the cached remote endpoint.
      90              44 :     Endpoint remote_endpoint() const noexcept override
      91                 :     {
      92              44 :         return remote_endpoint_;
      93                 :     }
      94                 : 
      95                 :     // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
      96                 : 
      97            7360 :     std::coroutine_handle<> connect(
      98                 :         std::coroutine_handle<> h,
      99                 :         capy::executor_ref ex,
     100                 :         Endpoint ep,
     101                 :         std::stop_token token,
     102                 :         std::error_code* ec) override
     103                 :     {
     104            7360 :         return do_connect(h, ex, ep, token, ec);
     105                 :     }
     106                 : 
     107          217409 :     std::coroutine_handle<> read_some(
     108                 :         std::coroutine_handle<> h,
     109                 :         capy::executor_ref ex,
     110                 :         buffer_param param,
     111                 :         std::stop_token token,
     112                 :         std::error_code* ec,
     113                 :         std::size_t* bytes_out) override
     114                 :     {
     115          217409 :         return do_read_some(h, ex, param, token, ec, bytes_out);
     116                 :     }
     117                 : 
     118          217112 :     std::coroutine_handle<> write_some(
     119                 :         std::coroutine_handle<> h,
     120                 :         capy::executor_ref ex,
     121                 :         buffer_param param,
     122                 :         std::stop_token token,
     123                 :         std::error_code* ec,
     124                 :         std::size_t* bytes_out) override
     125                 :     {
     126          217112 :         return do_write_some(h, ex, param, token, ec, bytes_out);
     127                 :     }
     128                 : 
     129                 :     std::error_code
     130               6 :     shutdown(corosio::shutdown_type what) noexcept override
     131                 :     {
     132               6 :         return do_shutdown(static_cast<int>(what));
     133                 :     }
     134                 : 
     135             186 :     void cancel() noexcept override
     136                 :     {
     137             186 :         this->do_cancel();
     138             186 :     }
     139                 : 
     140                 :     // --- End virtual overrides ---
     141                 : 
     142                 :     /// Close the socket (non-virtual, called by the service).
     143                 :     void close_socket() noexcept
     144                 :     {
     145                 :         this->do_close_socket();
     146                 :     }
     147                 : 
     148                 :     /** Shut down part or all of the full-duplex connection.
     149                 : 
     150                 :         @param what 0 = receive, 1 = send, 2 = both.
     151                 :     */
     152               6 :     std::error_code do_shutdown(int what) noexcept
     153                 :     {
     154                 :         int how;
     155               6 :         switch (what)
     156                 :         {
     157               2 :         case 0: // shutdown_receive
     158               2 :             how = SHUT_RD;
     159               2 :             break;
     160               2 :         case 1: // shutdown_send
     161               2 :             how = SHUT_WR;
     162               2 :             break;
     163               2 :         case 2: // shutdown_both
     164               2 :             how = SHUT_RDWR;
     165               2 :             break;
     166 MIS           0 :         default:
     167               0 :             return make_err(EINVAL);
     168                 :         }
     169 HIT           6 :         if (::shutdown(this->fd_, how) != 0)
     170 MIS           0 :             return make_err(errno);
     171 HIT           6 :         return {};
     172                 :     }
     173                 : 
     174                 :     /// Cache local and remote endpoints.
     175           14726 :     void set_endpoints(Endpoint local, Endpoint remote) noexcept
     176                 :     {
     177           14726 :         this->local_endpoint_ = std::move(local);
     178           14726 :         remote_endpoint_      = std::move(remote);
     179           14726 :     }
     180                 : 
     181                 :     /** Shared connect dispatch.
     182                 : 
     183                 :         Tries the connect syscall speculatively. On synchronous
     184                 :         completion, returns via inline budget or posts through queue.
     185                 :         On EINPROGRESS, registers with the reactor.
     186                 :     */
     187                 :     std::coroutine_handle<> do_connect(
     188                 :         std::coroutine_handle<>,
     189                 :         capy::executor_ref,
     190                 :         Endpoint const&,
     191                 :         std::stop_token const&,
     192                 :         std::error_code*);
     193                 : 
     194                 :     /** Shared scatter-read dispatch.
     195                 : 
     196                 :         Tries readv() speculatively. On success or hard error,
     197                 :         returns via inline budget or posts through queue.
     198                 :         On EAGAIN, registers with the reactor.
     199                 :     */
     200                 :     std::coroutine_handle<> do_read_some(
     201                 :         std::coroutine_handle<>,
     202                 :         capy::executor_ref,
     203                 :         buffer_param,
     204                 :         std::stop_token const&,
     205                 :         std::error_code*,
     206                 :         std::size_t*);
     207                 : 
     208                 :     /** Shared gather-write dispatch.
     209                 : 
     210                 :         Tries the write via WriteOp::write_policy speculatively.
     211                 :         On success or hard error, returns via inline budget or
     212                 :         posts through queue. On EAGAIN, registers with the reactor.
     213                 :     */
     214                 :     std::coroutine_handle<> do_write_some(
     215                 :         std::coroutine_handle<>,
     216                 :         capy::executor_ref,
     217                 :         buffer_param,
     218                 :         std::stop_token const&,
     219                 :         std::error_code*,
     220                 :         std::size_t*);
     221                 : 
     222                 :     /** Close the socket and cancel pending operations.
     223                 : 
     224                 :         Extends the base do_close_socket() to also reset
     225                 :         the remote endpoint.
     226                 :     */
     227           66571 :     void do_close_socket() noexcept
     228                 :     {
     229           66571 :         base_type::do_close_socket();
     230           66571 :         remote_endpoint_ = Endpoint{};
     231           66571 :     }
     232                 : 
     233                 : private:
     234                 :     // CRTP callbacks for reactor_basic_socket cancel/close
     235                 : 
     236                 :     template<class Op>
     237             192 :     reactor_op_base** op_to_desc_slot(Op& op) noexcept
     238                 :     {
     239             192 :         if (&op == static_cast<void*>(&conn_))
     240 MIS           0 :             return &this->desc_state_.connect_op;
     241 HIT         192 :         if (&op == static_cast<void*>(&rd_))
     242             192 :             return &this->desc_state_.read_op;
     243 MIS           0 :         if (&op == static_cast<void*>(&wr_))
     244               0 :             return &this->desc_state_.write_op;
     245               0 :         return nullptr;
     246                 :     }
     247                 : 
     248                 :     template<class Op>
     249               0 :     bool* op_to_cancel_flag(Op& op) noexcept
     250                 :     {
     251               0 :         if (&op == static_cast<void*>(&conn_))
     252               0 :             return &this->desc_state_.connect_cancel_pending;
     253               0 :         if (&op == static_cast<void*>(&rd_))
     254               0 :             return &this->desc_state_.read_cancel_pending;
     255               0 :         if (&op == static_cast<void*>(&wr_))
     256               0 :             return &this->desc_state_.write_cancel_pending;
     257               0 :         return nullptr;
     258                 :     }
     259                 : 
     260                 :     template<class Fn>
     261 HIT       66759 :     void for_each_op(Fn fn) noexcept
     262                 :     {
     263           66759 :         fn(conn_);
     264           66759 :         fn(rd_);
     265           66759 :         fn(wr_);
     266           66759 :     }
     267                 : 
     268                 :     template<class Fn>
     269           66759 :     void for_each_desc_entry(Fn fn) noexcept
     270                 :     {
     271           66759 :         fn(conn_, this->desc_state_.connect_op);
     272           66759 :         fn(rd_, this->desc_state_.read_op);
     273           66759 :         fn(wr_, this->desc_state_.write_op);
     274           66759 :     }
     275                 : };
     276                 : 
     277                 : template<
     278                 :     class Derived,
     279                 :     class Service,
     280                 :     class ConnOp,
     281                 :     class ReadOp,
     282                 :     class WriteOp,
     283                 :     class DescState,
     284                 :     class ImplBase,
     285                 :     class Endpoint>
     286                 : std::coroutine_handle<>
     287            7360 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     288                 :     do_connect(
     289                 :         std::coroutine_handle<> h,
     290                 :         capy::executor_ref ex,
     291                 :         Endpoint const& ep,
     292                 :         std::stop_token const& token,
     293                 :         std::error_code* ec)
     294                 : {
     295            7360 :     auto& op = conn_;
     296                 : 
     297            7360 :     sockaddr_storage storage{};
     298            7360 :     socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
     299                 :     int result =
     300            7360 :         ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     301                 : 
     302            7360 :     if (result == 0)
     303                 :     {
     304               4 :         sockaddr_storage local_storage{};
     305               4 :         socklen_t local_len = sizeof(local_storage);
     306               4 :         if (::getsockname(
     307                 :                 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
     308               4 :                 &local_len) == 0)
     309 MIS           0 :             this->local_endpoint_ =
     310 HIT           4 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     311               4 :         remote_endpoint_ = ep;
     312                 :     }
     313                 : 
     314            7360 :     if (result == 0 || errno != EINPROGRESS)
     315                 :     {
     316               4 :         int err = (result < 0) ? errno : 0;
     317               4 :         if (this->svc_.scheduler().try_consume_inline_budget())
     318                 :         {
     319 MIS           0 :             *ec = err ? make_err(err) : std::error_code{};
     320               0 :             op.cont_op.cont.h = h;
     321               0 :             return dispatch_coro(ex, op.cont_op.cont);
     322                 :         }
     323 HIT           4 :         op.reset();
     324               4 :         op.h               = h;
     325               4 :         op.ex              = ex;
     326               4 :         op.ec_out          = ec;
     327               4 :         op.fd              = this->fd_;
     328               4 :         op.target_endpoint = ep;
     329               4 :         op.start(token, static_cast<Derived*>(this));
     330               4 :         op.impl_ptr = this->shared_from_this();
     331               4 :         op.complete(err, 0);
     332               4 :         this->svc_.post(&op);
     333               4 :         return std::noop_coroutine();
     334                 :     }
     335                 : 
     336                 :     // EINPROGRESS — register with reactor
     337            7356 :     op.reset();
     338            7356 :     op.h               = h;
     339            7356 :     op.ex              = ex;
     340            7356 :     op.ec_out          = ec;
     341            7356 :     op.fd              = this->fd_;
     342            7356 :     op.target_endpoint = ep;
     343            7356 :     op.start(token, static_cast<Derived*>(this));
     344            7356 :     op.impl_ptr = this->shared_from_this();
     345                 : 
     346            7356 :     this->register_op(
     347            7356 :         op, this->desc_state_.connect_op, this->desc_state_.write_ready,
     348            7356 :         this->desc_state_.connect_cancel_pending, true);
     349            7356 :     return std::noop_coroutine();
     350                 : }
     351                 : 
     352                 : template<
     353                 :     class Derived,
     354                 :     class Service,
     355                 :     class ConnOp,
     356                 :     class ReadOp,
     357                 :     class WriteOp,
     358                 :     class DescState,
     359                 :     class ImplBase,
     360                 :     class Endpoint>
     361                 : std::coroutine_handle<>
     362          217409 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     363                 :     do_read_some(
     364                 :         std::coroutine_handle<> h,
     365                 :         capy::executor_ref ex,
     366                 :         buffer_param param,
     367                 :         std::stop_token const& token,
     368                 :         std::error_code* ec,
     369                 :         std::size_t* bytes_out)
     370                 : {
     371          217409 :     auto& op = rd_;
     372          217409 :     op.reset();
     373                 : 
     374          217409 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     375          217409 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     376                 : 
     377          217409 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     378                 :     {
     379               2 :         op.empty_buffer_read = true;
     380               2 :         op.h                 = h;
     381               2 :         op.ex                = ex;
     382               2 :         op.ec_out            = ec;
     383               2 :         op.bytes_out         = bytes_out;
     384               2 :         op.start(token, static_cast<Derived*>(this));
     385               2 :         op.impl_ptr = this->shared_from_this();
     386               2 :         op.complete(0, 0);
     387               2 :         this->svc_.post(&op);
     388               2 :         return std::noop_coroutine();
     389                 :     }
     390                 : 
     391          434814 :     for (int i = 0; i < op.iovec_count; ++i)
     392                 :     {
     393          217407 :         op.iovecs[i].iov_base = bufs[i].data();
     394          217407 :         op.iovecs[i].iov_len  = bufs[i].size();
     395                 :     }
     396                 : 
     397                 :     // Speculative read
     398                 :     ssize_t n;
     399                 :     do
     400                 :     {
     401          217407 :         n = ::readv(this->fd_, op.iovecs, op.iovec_count);
     402                 :     }
     403          217407 :     while (n < 0 && errno == EINTR);
     404                 : 
     405          217407 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     406                 :     {
     407          217018 :         int err    = (n < 0) ? errno : 0;
     408          217018 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     409                 : 
     410          217018 :         if (this->svc_.scheduler().try_consume_inline_budget())
     411                 :         {
     412          173648 :             if (err)
     413 MIS           0 :                 *ec = make_err(err);
     414 HIT      173648 :             else if (n == 0)
     415              10 :                 *ec = capy::error::eof;
     416                 :             else
     417          173638 :                 *ec = {};
     418          173648 :             *bytes_out = bytes;
     419          173648 :             op.cont_op.cont.h = h;
     420          173648 :             return dispatch_coro(ex, op.cont_op.cont);
     421                 :         }
     422           43370 :         op.h         = h;
     423           43370 :         op.ex        = ex;
     424           43370 :         op.ec_out    = ec;
     425           43370 :         op.bytes_out = bytes_out;
     426           43370 :         op.start(token, static_cast<Derived*>(this));
     427           43370 :         op.impl_ptr = this->shared_from_this();
     428           43370 :         op.complete(err, bytes);
     429           43370 :         this->svc_.post(&op);
     430           43370 :         return std::noop_coroutine();
     431                 :     }
     432                 : 
     433                 :     // EAGAIN — register with reactor
     434             389 :     op.h         = h;
     435             389 :     op.ex        = ex;
     436             389 :     op.ec_out    = ec;
     437             389 :     op.bytes_out = bytes_out;
     438             389 :     op.fd        = this->fd_;
     439             389 :     op.start(token, static_cast<Derived*>(this));
     440             389 :     op.impl_ptr = this->shared_from_this();
     441                 : 
     442             389 :     this->register_op(
     443             389 :         op, this->desc_state_.read_op, this->desc_state_.read_ready,
     444             389 :         this->desc_state_.read_cancel_pending);
     445             389 :     return std::noop_coroutine();
     446                 : }
     447                 : 
     448                 : template<
     449                 :     class Derived,
     450                 :     class Service,
     451                 :     class ConnOp,
     452                 :     class ReadOp,
     453                 :     class WriteOp,
     454                 :     class DescState,
     455                 :     class ImplBase,
     456                 :     class Endpoint>
     457                 : std::coroutine_handle<>
     458          217112 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
     459                 :     do_write_some(
     460                 :         std::coroutine_handle<> h,
     461                 :         capy::executor_ref ex,
     462                 :         buffer_param param,
     463                 :         std::stop_token const& token,
     464                 :         std::error_code* ec,
     465                 :         std::size_t* bytes_out)
     466                 : {
     467          217112 :     auto& op = wr_;
     468          217112 :     op.reset();
     469                 : 
     470          217112 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     471          217112 :     op.iovec_count =
     472          217112 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     473                 : 
     474          217112 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     475                 :     {
     476               2 :         op.h         = h;
     477               2 :         op.ex        = ex;
     478               2 :         op.ec_out    = ec;
     479               2 :         op.bytes_out = bytes_out;
     480               2 :         op.start(token, static_cast<Derived*>(this));
     481               2 :         op.impl_ptr = this->shared_from_this();
     482               2 :         op.complete(0, 0);
     483               2 :         this->svc_.post(&op);
     484               2 :         return std::noop_coroutine();
     485                 :     }
     486                 : 
     487          434220 :     for (int i = 0; i < op.iovec_count; ++i)
     488                 :     {
     489          217110 :         op.iovecs[i].iov_base = bufs[i].data();
     490          217110 :         op.iovecs[i].iov_len  = bufs[i].size();
     491                 :     }
     492                 : 
     493                 :     // Speculative write via backend-specific write policy
     494                 :     ssize_t n =
     495          217110 :         WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
     496                 : 
     497          217110 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     498                 :     {
     499          217110 :         int err    = (n < 0) ? errno : 0;
     500          217110 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     501                 : 
     502          217110 :         if (this->svc_.scheduler().try_consume_inline_budget())
     503                 :         {
     504          173702 :             *ec        = err ? make_err(err) : std::error_code{};
     505          173702 :             *bytes_out = bytes;
     506          173702 :             op.cont_op.cont.h = h;
     507          173702 :             return dispatch_coro(ex, op.cont_op.cont);
     508                 :         }
     509           43408 :         op.h         = h;
     510           43408 :         op.ex        = ex;
     511           43408 :         op.ec_out    = ec;
     512           43408 :         op.bytes_out = bytes_out;
     513           43408 :         op.start(token, static_cast<Derived*>(this));
     514           43408 :         op.impl_ptr = this->shared_from_this();
     515           43408 :         op.complete(err, bytes);
     516           43408 :         this->svc_.post(&op);
     517           43408 :         return std::noop_coroutine();
     518                 :     }
     519                 : 
     520                 :     // EAGAIN — register with reactor
     521 MIS           0 :     op.h         = h;
     522               0 :     op.ex        = ex;
     523               0 :     op.ec_out    = ec;
     524               0 :     op.bytes_out = bytes_out;
     525               0 :     op.fd        = this->fd_;
     526               0 :     op.start(token, static_cast<Derived*>(this));
     527               0 :     op.impl_ptr = this->shared_from_this();
     528                 : 
     529               0 :     this->register_op(
     530               0 :         op, this->desc_state_.write_op, this->desc_state_.write_ready,
     531               0 :         this->desc_state_.write_cancel_pending, true);
     532               0 :     return std::noop_coroutine();
     533                 : }
     534                 : 
     535                 : } // namespace boost::corosio::detail
     536                 : 
     537                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
        

Generated by: LCOV version 2.3