LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_basic_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.2 % 174 157 17
Test Date: 2026-04-16 21:41:12 Functions: 63.3 % 294 186 108

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/intrusive.hpp>
      14                 : #include <boost/corosio/detail/native_handle.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      17                 : #include <boost/corosio/native/detail/make_err.hpp>
      18                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      19                 : 
      20                 : #include <memory>
      21                 : #include <mutex>
      22                 : #include <utility>
      23                 : 
      24                 : #include <errno.h>
      25                 : #include <netinet/in.h>
      26                 : #include <sys/socket.h>
      27                 : #include <unistd.h>
      28                 : 
      29                 : namespace boost::corosio::detail {
      30                 : 
      31                 : /** CRTP base for reactor-backed socket implementations.
      32                 : 
      33                 :     Extracts the shared data members, virtual overrides, and
      34                 :     cancel/close/register logic that is identical across TCP
      35                 :     (reactor_stream_socket) and UDP (reactor_datagram_socket).
      36                 : 
      37                 :     Derived classes provide CRTP callbacks that enumerate their
      38                 :     specific op slots so cancel/close can iterate them generically.
      39                 : 
      40                 :     @tparam Derived   The concrete socket type (CRTP).
      41                 :     @tparam ImplBase  The public vtable base (tcp_socket::implementation
      42                 :                       or udp_socket::implementation).
      43                 :     @tparam Service   The backend's service type.
      44                 :     @tparam DescState The backend's descriptor_state type.
      45                 :     @tparam Endpoint  The endpoint type (endpoint or local_endpoint).
      46                 : */
      47                 : template<
      48                 :     class Derived,
      49                 :     class ImplBase,
      50                 :     class Service,
      51                 :     class DescState,
      52                 :     class Endpoint = endpoint>
      53                 : class reactor_basic_socket
      54                 :     : public ImplBase
      55                 :     , public std::enable_shared_from_this<Derived>
      56                 :     , public intrusive_list<Derived>::node
      57                 : {
      58                 :     friend Derived;
      59                 : 
      60                 :     template<class, class, class, class, class, class, class, class>
      61                 :     friend class reactor_stream_socket;
      62                 : 
      63                 :     template<class, class, class, class, class, class, class, class, class, class>
      64                 :     friend class reactor_datagram_socket;
      65                 : 
      66 HIT       22309 :     explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
      67                 : 
      68                 : protected:
      69                 :     Service& svc_;
      70                 :     int fd_ = -1;
      71                 :     Endpoint local_endpoint_;
      72                 : 
      73                 : public:
      74                 :     /// Per-descriptor state for persistent reactor registration.
      75                 :     DescState desc_state_;
      76                 : 
      77           22309 :     ~reactor_basic_socket() override = default;
      78                 : 
      79                 :     /// Return the underlying file descriptor.
      80           67602 :     native_handle_type native_handle() const noexcept override
      81                 :     {
      82           67602 :         return fd_;
      83                 :     }
      84                 : 
      85                 :     /// Return the cached local endpoint.
      86              80 :     Endpoint local_endpoint() const noexcept override
      87                 :     {
      88              80 :         return local_endpoint_;
      89                 :     }
      90                 : 
      91                 :     /// Return true if the socket has an open file descriptor.
      92                 :     bool is_open() const noexcept
      93                 :     {
      94                 :         return fd_ >= 0;
      95                 :     }
      96                 : 
      97                 :     /// Set a socket option.
      98              20 :     std::error_code set_option(
      99                 :         int level,
     100                 :         int optname,
     101                 :         void const* data,
     102                 :         std::size_t size) noexcept override
     103                 :     {
     104              20 :         if (::setsockopt(
     105              20 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     106 MIS           0 :             return make_err(errno);
     107 HIT          20 :         return {};
     108                 :     }
     109                 : 
     110                 :     /// Get a socket option.
     111                 :     std::error_code
     112              78 :     get_option(int level, int optname, void* data, std::size_t* size)
     113                 :         const noexcept override
     114                 :     {
     115              78 :         socklen_t len = static_cast<socklen_t>(*size);
     116              78 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     117 MIS           0 :             return make_err(errno);
     118 HIT          78 :         *size = static_cast<std::size_t>(len);
     119              78 :         return {};
     120                 :     }
     121                 : 
     122                 :     /// Assign the file descriptor.
     123            7355 :     void set_socket(int fd) noexcept
     124                 :     {
     125            7355 :         fd_ = fd;
     126            7355 :     }
     127                 : 
     128                 :     /// Cache the local endpoint.
     129                 :     void set_local_endpoint(Endpoint ep) noexcept
     130                 :     {
     131                 :         local_endpoint_ = ep;
     132                 :     }
     133                 : 
     134                 :     /** Bind the socket to a local endpoint.
     135                 : 
     136                 :         Calls ::bind() and caches the resulting local endpoint
     137                 :         via getsockname().
     138                 : 
     139                 :         @param ep The endpoint to bind to.
     140                 :         @return Error code on failure, empty on success.
     141                 :     */
     142              76 :     std::error_code do_bind(Endpoint const& ep) noexcept
     143                 :     {
     144              76 :         sockaddr_storage storage{};
     145              76 :         socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
     146              76 :         if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
     147              10 :             return make_err(errno);
     148                 : 
     149              66 :         sockaddr_storage local_storage{};
     150              66 :         socklen_t local_len = sizeof(local_storage);
     151              66 :         if (::getsockname(
     152              66 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     153                 :             0)
     154              52 :             local_endpoint_ =
     155              66 :                 from_sockaddr_as(local_storage, local_len, Endpoint{});
     156                 : 
     157              66 :         return {};
     158                 :     }
     159                 : 
     160                 :     /// Assign the fd, initialize descriptor state, and register with the reactor.
     161            7532 :     void init_and_register(int fd) noexcept
     162                 :     {
     163            7532 :         fd_ = fd;
     164            7532 :         desc_state_.fd = fd;
     165                 :         {
     166            7532 :             std::lock_guard lock(desc_state_.mutex);
     167            7532 :             desc_state_.read_op    = nullptr;
     168            7532 :             desc_state_.write_op   = nullptr;
     169            7532 :             desc_state_.connect_op = nullptr;
     170            7532 :         }
     171            7532 :         svc_.scheduler().register_descriptor(fd, &desc_state_);
     172            7532 :     }
     173                 : 
     174                 :     /** Register an op with the reactor.
     175                 : 
     176                 :         Handles cached edge events and deferred cancellation.
     177                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     178                 :         I/O failed.
     179                 :     */
     180                 :     template<class Op>
     181                 :     void register_op(
     182                 :         Op& op,
     183                 :         reactor_op_base*& desc_slot,
     184                 :         bool& ready_flag,
     185                 :         bool& cancel_flag,
     186                 :         bool is_write_direction = false) noexcept;
     187                 : 
     188                 :     /** Cancel a single pending operation.
     189                 : 
     190                 :         Claims the operation from its descriptor_state slot under
     191                 :         the mutex and posts it to the scheduler as cancelled.
     192                 :         Derived must implement:
     193                 :           op_to_desc_slot(Op&) -> reactor_op_base**
     194                 :           op_to_cancel_flag(Op&) -> bool*
     195                 :     */
     196                 :     template<class Op>
     197                 :     void cancel_single_op(Op& op) noexcept;
     198                 : 
     199                 :     /** Cancel all pending operations.
     200                 : 
     201                 :         Invoked by the derived class's cancel() override.
     202                 :         Derived must implement:
     203                 :           for_each_op(auto fn)
     204                 :           for_each_desc_entry(auto fn)
     205                 :     */
     206                 :     void do_cancel() noexcept;
     207                 : 
     208                 :     /** Close the socket and cancel pending operations.
     209                 : 
     210                 :         Invoked by the derived class's close_socket(). The
     211                 :         derived class may add backend-specific cleanup after
     212                 :         calling this method.
     213                 :         Derived must implement:
     214                 :           for_each_op(auto fn)
     215                 :           for_each_desc_entry(auto fn)
     216                 :     */
     217                 :     void do_close_socket() noexcept;
     218                 : 
     219                 :     /** Release the socket without closing the fd.
     220                 : 
     221                 :         Like do_close_socket() but does not call ::close().
     222                 :         Returns the fd so the caller can take ownership.
     223                 :     */
     224                 :     native_handle_type do_release_socket() noexcept;
     225                 : };
     226                 : 
     227                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     228                 : template<class Op>
     229                 : void
     230            7755 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::register_op(
     231                 :     Op& op,
     232                 :     reactor_op_base*& desc_slot,
     233                 :     bool& ready_flag,
     234                 :     bool& cancel_flag,
     235                 :     bool is_write_direction) noexcept
     236                 : {
     237            7755 :     svc_.work_started();
     238                 : 
     239            7755 :     std::lock_guard lock(desc_state_.mutex);
     240            7755 :     bool io_done = false;
     241            7755 :     if (ready_flag)
     242                 :     {
     243             187 :         ready_flag = false;
     244             187 :         op.perform_io();
     245             187 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     246             187 :         if (!io_done)
     247             187 :             op.errn = 0;
     248                 :     }
     249                 : 
     250            7755 :     if (cancel_flag)
     251                 :     {
     252 MIS           0 :         cancel_flag = false;
     253               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     254                 :     }
     255                 : 
     256 HIT        7755 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     257                 :     {
     258 MIS           0 :         svc_.post(&op);
     259               0 :         svc_.work_finished();
     260                 :     }
     261                 :     else
     262                 :     {
     263 HIT        7755 :         desc_slot = &op;
     264                 : 
     265                 :         // Select must rebuild its fd_sets when a write-direction op
     266                 :         // is parked, so select() watches for writability. Compiled
     267                 :         // away to nothing for epoll and kqueue.
     268                 :         if constexpr (requires { Service::needs_write_notification; })
     269                 :         {
     270                 :             if constexpr (Service::needs_write_notification)
     271                 :             {
     272            3278 :                 if (is_write_direction)
     273            3085 :                     svc_.scheduler().notify_reactor();
     274                 :             }
     275                 :         }
     276                 :     }
     277            7755 : }
     278                 : 
     279                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     280                 : template<class Op>
     281                 : void
     282             194 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::cancel_single_op(
     283                 :     Op& op) noexcept
     284                 : {
     285             194 :     auto self = this->weak_from_this().lock();
     286             194 :     if (!self)
     287 MIS           0 :         return;
     288                 : 
     289 HIT         194 :     op.request_cancel();
     290                 : 
     291             194 :     auto* d                       = static_cast<Derived*>(this);
     292             194 :     reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
     293                 : 
     294             194 :     if (desc_op_ptr)
     295                 :     {
     296             194 :         reactor_op_base* claimed = nullptr;
     297                 :         {
     298             194 :             std::lock_guard lock(desc_state_.mutex);
     299             194 :             if (*desc_op_ptr == &op)
     300             194 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     301                 :             else
     302                 :             {
     303 MIS           0 :                 bool* cflag = d->op_to_cancel_flag(op);
     304               0 :                 if (cflag)
     305               0 :                     *cflag = true;
     306                 :             }
     307 HIT         194 :         }
     308             194 :         if (claimed)
     309                 :         {
     310             194 :             op.impl_ptr = self;
     311             194 :             svc_.post(&op);
     312             194 :             svc_.work_finished();
     313                 :         }
     314                 :     }
     315             194 : }
     316                 : 
     317                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     318                 : void
     319             190 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     320                 :     do_cancel() noexcept
     321                 : {
     322             190 :     auto self = this->weak_from_this().lock();
     323             190 :     if (!self)
     324 MIS           0 :         return;
     325                 : 
     326 HIT         190 :     auto* d = static_cast<Derived*>(this);
     327                 : 
     328             768 :     d->for_each_op([](auto& op) { op.request_cancel(); });
     329                 : 
     330                 :     // Claim ops under a single lock acquisition
     331                 :     struct claimed_entry
     332                 :     {
     333                 :         reactor_op_base* op   = nullptr;
     334                 :         reactor_op_base* base = nullptr;
     335                 :     };
     336                 :     // Max 3 ops (conn, rd, wr)
     337             190 :     claimed_entry claimed[3];
     338             190 :     int count = 0;
     339                 : 
     340                 :     {
     341             190 :         std::lock_guard lock(desc_state_.mutex);
     342            1346 :         d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
     343             578 :             if (desc_slot == &op)
     344                 :             {
     345             101 :                 claimed[count].op   = std::exchange(desc_slot, nullptr);
     346             101 :                 claimed[count].base = &op;
     347             101 :                 ++count;
     348                 :             }
     349                 :         });
     350             190 :     }
     351                 : 
     352             291 :     for (int i = 0; i < count; ++i)
     353                 :     {
     354             101 :         claimed[i].base->impl_ptr = self;
     355             101 :         svc_.post(claimed[i].base);
     356             101 :         svc_.work_finished();
     357                 :     }
     358             190 : }
     359                 : 
     360                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     361                 : void
     362           67035 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     363                 :     do_close_socket() noexcept
     364                 : {
     365           67035 :     auto self = this->weak_from_this().lock();
     366           67035 :     if (self)
     367                 :     {
     368           67035 :         auto* d = static_cast<Derived*>(this);
     369                 : 
     370          269068 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     371                 : 
     372                 :         struct claimed_entry
     373                 :         {
     374                 :             reactor_op_base* base = nullptr;
     375                 :         };
     376           67035 :         claimed_entry claimed[3];
     377           67035 :         int count = 0;
     378                 : 
     379                 :         {
     380           67035 :             std::lock_guard lock(desc_state_.mutex);
     381           67035 :             d->for_each_desc_entry(
     382          404066 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     383          202033 :                     auto* c = std::exchange(desc_slot, nullptr);
     384          202033 :                     if (c)
     385                 :                     {
     386               4 :                         claimed[count].base = c;
     387               4 :                         ++count;
     388                 :                     }
     389                 :                 });
     390           67035 :             desc_state_.read_ready             = false;
     391           67035 :             desc_state_.write_ready            = false;
     392           67035 :             desc_state_.read_cancel_pending    = false;
     393           67035 :             desc_state_.write_cancel_pending   = false;
     394           67035 :             desc_state_.connect_cancel_pending = false;
     395                 : 
     396           67035 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     397             268 :                 desc_state_.impl_ref_ = self;
     398           67035 :         }
     399                 : 
     400           67039 :         for (int i = 0; i < count; ++i)
     401                 :         {
     402               4 :             claimed[i].base->impl_ptr = self;
     403               4 :             svc_.post(claimed[i].base);
     404               4 :             svc_.work_finished();
     405                 :         }
     406                 :     }
     407                 : 
     408           67035 :     if (fd_ >= 0)
     409                 :     {
     410           14885 :         if (desc_state_.registered_events != 0)
     411           14885 :             svc_.scheduler().deregister_descriptor(fd_);
     412           14885 :         ::close(fd_);
     413           14885 :         fd_ = -1;
     414                 :     }
     415                 : 
     416           67035 :     desc_state_.fd                = -1;
     417           67035 :     desc_state_.registered_events = 0;
     418                 : 
     419           67035 :     local_endpoint_ = Endpoint{};
     420           67035 : }
     421                 : 
     422                 : template<class Derived, class ImplBase, class Service, class DescState, class Endpoint>
     423                 : native_handle_type
     424               2 : reactor_basic_socket<Derived, ImplBase, Service, DescState, Endpoint>::
     425                 :     do_release_socket() noexcept
     426                 : {
     427                 :     // Cancel pending ops (same as do_close_socket)
     428               2 :     auto self = this->weak_from_this().lock();
     429               2 :     if (self)
     430                 :     {
     431               2 :         auto* d = static_cast<Derived*>(this);
     432                 : 
     433               8 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     434                 : 
     435                 :         struct claimed_entry
     436                 :         {
     437                 :             reactor_op_base* base = nullptr;
     438                 :         };
     439               2 :         claimed_entry claimed[3];
     440               2 :         int count = 0;
     441                 : 
     442                 :         {
     443               2 :             std::lock_guard lock(desc_state_.mutex);
     444               2 :             d->for_each_desc_entry(
     445              12 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     446               6 :                     auto* c = std::exchange(desc_slot, nullptr);
     447               6 :                     if (c)
     448                 :                     {
     449 MIS           0 :                         claimed[count].base = c;
     450               0 :                         ++count;
     451                 :                     }
     452                 :                 });
     453 HIT           2 :             desc_state_.read_ready             = false;
     454               2 :             desc_state_.write_ready            = false;
     455               2 :             desc_state_.read_cancel_pending    = false;
     456               2 :             desc_state_.write_cancel_pending   = false;
     457               2 :             desc_state_.connect_cancel_pending = false;
     458                 : 
     459               2 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     460 MIS           0 :                 desc_state_.impl_ref_ = self;
     461 HIT           2 :         }
     462                 : 
     463               2 :         for (int i = 0; i < count; ++i)
     464                 :         {
     465 MIS           0 :             claimed[i].base->impl_ptr = self;
     466               0 :             svc_.post(claimed[i].base);
     467               0 :             svc_.work_finished();
     468                 :         }
     469                 :     }
     470                 : 
     471 HIT           2 :     native_handle_type released = fd_;
     472                 : 
     473               2 :     if (fd_ >= 0)
     474                 :     {
     475               2 :         if (desc_state_.registered_events != 0)
     476               2 :             svc_.scheduler().deregister_descriptor(fd_);
     477                 :         // Do NOT close -- caller takes ownership
     478               2 :         fd_ = -1;
     479                 :     }
     480                 : 
     481               2 :     desc_state_.fd                = -1;
     482               2 :     desc_state_.registered_events = 0;
     483                 : 
     484               2 :     local_endpoint_ = Endpoint{};
     485                 : 
     486               4 :     return released;
     487               2 : }
     488                 : 
     489                 : } // namespace boost::corosio::detail
     490                 : 
     491                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
        

Generated by: LCOV version 2.3