TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/shutdown_type.hpp>
15 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
16 : #include <boost/corosio/detail/dispatch_coro.hpp>
17 : #include <boost/capy/buffers.hpp>
18 :
19 : #include <coroutine>
20 :
21 : #include <errno.h>
22 : #include <sys/socket.h>
23 : #include <sys/uio.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : /** CRTP base for reactor-backed stream socket implementations.
28 :
29 : Inherits shared data members and cancel/close/register logic
30 : from reactor_basic_socket. Adds the stream-specific remote
31 : endpoint, shutdown, and I/O dispatch (connect, read, write).
32 :
33 : @tparam Derived The concrete socket type (CRTP).
34 : @tparam Service The backend's socket service type.
35 : @tparam ConnOp The backend's connect op type.
36 : @tparam ReadOp The backend's read op type.
37 : @tparam WriteOp The backend's write op type.
38 : @tparam DescState The backend's descriptor_state type.
39 : @tparam ImplBase The public vtable base
40 : (tcp_socket::implementation or
41 : local_stream_socket::implementation).
42 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
43 : */
44 : template<
45 : class Derived,
46 : class Service,
47 : class ConnOp,
48 : class ReadOp,
49 : class WriteOp,
50 : class DescState,
51 : class ImplBase = tcp_socket::implementation,
52 : class Endpoint = endpoint>
53 : class reactor_stream_socket
54 : : public reactor_basic_socket<
55 : Derived,
56 : ImplBase,
57 : Service,
58 : DescState,
59 : Endpoint>
60 : {
61 : using base_type = reactor_basic_socket<
62 : Derived,
63 : ImplBase,
64 : Service,
65 : DescState,
66 : Endpoint>;
67 : friend base_type;
68 : friend Derived;
69 :
70 : protected:
71 : // NOLINTNEXTLINE(bugprone-crtp-constructor-accessibility)
72 HIT 22189 : explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
73 :
74 : protected:
75 : Endpoint remote_endpoint_;
76 :
77 : public:
78 : /// Pending connect operation slot.
79 : ConnOp conn_;
80 :
81 : /// Pending read operation slot.
82 : ReadOp rd_;
83 :
84 : /// Pending write operation slot.
85 : WriteOp wr_;
86 :
87 22189 : ~reactor_stream_socket() override = default;
88 :
89 : /// Return the cached remote endpoint.
90 44 : Endpoint remote_endpoint() const noexcept override
91 : {
92 44 : return remote_endpoint_;
93 : }
94 :
95 : // --- Virtual method overrides (satisfy ImplBase pure virtuals) ---
96 :
97 7360 : std::coroutine_handle<> connect(
98 : std::coroutine_handle<> h,
99 : capy::executor_ref ex,
100 : Endpoint ep,
101 : std::stop_token token,
102 : std::error_code* ec) override
103 : {
104 7360 : return do_connect(h, ex, ep, token, ec);
105 : }
106 :
107 217409 : std::coroutine_handle<> read_some(
108 : std::coroutine_handle<> h,
109 : capy::executor_ref ex,
110 : buffer_param param,
111 : std::stop_token token,
112 : std::error_code* ec,
113 : std::size_t* bytes_out) override
114 : {
115 217409 : return do_read_some(h, ex, param, token, ec, bytes_out);
116 : }
117 :
118 217112 : std::coroutine_handle<> write_some(
119 : std::coroutine_handle<> h,
120 : capy::executor_ref ex,
121 : buffer_param param,
122 : std::stop_token token,
123 : std::error_code* ec,
124 : std::size_t* bytes_out) override
125 : {
126 217112 : return do_write_some(h, ex, param, token, ec, bytes_out);
127 : }
128 :
129 : std::error_code
130 6 : shutdown(corosio::shutdown_type what) noexcept override
131 : {
132 6 : return do_shutdown(static_cast<int>(what));
133 : }
134 :
135 186 : void cancel() noexcept override
136 : {
137 186 : this->do_cancel();
138 186 : }
139 :
140 : // --- End virtual overrides ---
141 :
142 : /// Close the socket (non-virtual, called by the service).
143 : void close_socket() noexcept
144 : {
145 : this->do_close_socket();
146 : }
147 :
148 : /** Shut down part or all of the full-duplex connection.
149 :
150 : @param what 0 = receive, 1 = send, 2 = both.
151 : */
152 6 : std::error_code do_shutdown(int what) noexcept
153 : {
154 : int how;
155 6 : switch (what)
156 : {
157 2 : case 0: // shutdown_receive
158 2 : how = SHUT_RD;
159 2 : break;
160 2 : case 1: // shutdown_send
161 2 : how = SHUT_WR;
162 2 : break;
163 2 : case 2: // shutdown_both
164 2 : how = SHUT_RDWR;
165 2 : break;
166 MIS 0 : default:
167 0 : return make_err(EINVAL);
168 : }
169 HIT 6 : if (::shutdown(this->fd_, how) != 0)
170 MIS 0 : return make_err(errno);
171 HIT 6 : return {};
172 : }
173 :
174 : /// Cache local and remote endpoints.
175 14726 : void set_endpoints(Endpoint local, Endpoint remote) noexcept
176 : {
177 14726 : this->local_endpoint_ = std::move(local);
178 14726 : remote_endpoint_ = std::move(remote);
179 14726 : }
180 :
181 : /** Shared connect dispatch.
182 :
183 : Tries the connect syscall speculatively. On synchronous
184 : completion, returns via inline budget or posts through queue.
185 : On EINPROGRESS, registers with the reactor.
186 : */
187 : std::coroutine_handle<> do_connect(
188 : std::coroutine_handle<>,
189 : capy::executor_ref,
190 : Endpoint const&,
191 : std::stop_token const&,
192 : std::error_code*);
193 :
194 : /** Shared scatter-read dispatch.
195 :
196 : Tries readv() speculatively. On success or hard error,
197 : returns via inline budget or posts through queue.
198 : On EAGAIN, registers with the reactor.
199 : */
200 : std::coroutine_handle<> do_read_some(
201 : std::coroutine_handle<>,
202 : capy::executor_ref,
203 : buffer_param,
204 : std::stop_token const&,
205 : std::error_code*,
206 : std::size_t*);
207 :
208 : /** Shared gather-write dispatch.
209 :
210 : Tries the write via WriteOp::write_policy speculatively.
211 : On success or hard error, returns via inline budget or
212 : posts through queue. On EAGAIN, registers with the reactor.
213 : */
214 : std::coroutine_handle<> do_write_some(
215 : std::coroutine_handle<>,
216 : capy::executor_ref,
217 : buffer_param,
218 : std::stop_token const&,
219 : std::error_code*,
220 : std::size_t*);
221 :
222 : /** Close the socket and cancel pending operations.
223 :
224 : Extends the base do_close_socket() to also reset
225 : the remote endpoint.
226 : */
227 66571 : void do_close_socket() noexcept
228 : {
229 66571 : base_type::do_close_socket();
230 66571 : remote_endpoint_ = Endpoint{};
231 66571 : }
232 :
233 : private:
234 : // CRTP callbacks for reactor_basic_socket cancel/close
235 :
236 : template<class Op>
237 192 : reactor_op_base** op_to_desc_slot(Op& op) noexcept
238 : {
239 192 : if (&op == static_cast<void*>(&conn_))
240 MIS 0 : return &this->desc_state_.connect_op;
241 HIT 192 : if (&op == static_cast<void*>(&rd_))
242 192 : return &this->desc_state_.read_op;
243 MIS 0 : if (&op == static_cast<void*>(&wr_))
244 0 : return &this->desc_state_.write_op;
245 0 : return nullptr;
246 : }
247 :
248 : template<class Op>
249 0 : bool* op_to_cancel_flag(Op& op) noexcept
250 : {
251 0 : if (&op == static_cast<void*>(&conn_))
252 0 : return &this->desc_state_.connect_cancel_pending;
253 0 : if (&op == static_cast<void*>(&rd_))
254 0 : return &this->desc_state_.read_cancel_pending;
255 0 : if (&op == static_cast<void*>(&wr_))
256 0 : return &this->desc_state_.write_cancel_pending;
257 0 : return nullptr;
258 : }
259 :
260 : template<class Fn>
261 HIT 66759 : void for_each_op(Fn fn) noexcept
262 : {
263 66759 : fn(conn_);
264 66759 : fn(rd_);
265 66759 : fn(wr_);
266 66759 : }
267 :
268 : template<class Fn>
269 66759 : void for_each_desc_entry(Fn fn) noexcept
270 : {
271 66759 : fn(conn_, this->desc_state_.connect_op);
272 66759 : fn(rd_, this->desc_state_.read_op);
273 66759 : fn(wr_, this->desc_state_.write_op);
274 66759 : }
275 : };
276 :
277 : template<
278 : class Derived,
279 : class Service,
280 : class ConnOp,
281 : class ReadOp,
282 : class WriteOp,
283 : class DescState,
284 : class ImplBase,
285 : class Endpoint>
286 : std::coroutine_handle<>
287 7360 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
288 : do_connect(
289 : std::coroutine_handle<> h,
290 : capy::executor_ref ex,
291 : Endpoint const& ep,
292 : std::stop_token const& token,
293 : std::error_code* ec)
294 : {
295 7360 : auto& op = conn_;
296 :
297 7360 : sockaddr_storage storage{};
298 7360 : socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
299 : int result =
300 7360 : ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
301 :
302 7360 : if (result == 0)
303 : {
304 4 : sockaddr_storage local_storage{};
305 4 : socklen_t local_len = sizeof(local_storage);
306 4 : if (::getsockname(
307 : this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
308 4 : &local_len) == 0)
309 MIS 0 : this->local_endpoint_ =
310 HIT 4 : from_sockaddr_as(local_storage, local_len, Endpoint{});
311 4 : remote_endpoint_ = ep;
312 : }
313 :
314 7360 : if (result == 0 || errno != EINPROGRESS)
315 : {
316 4 : int err = (result < 0) ? errno : 0;
317 4 : if (this->svc_.scheduler().try_consume_inline_budget())
318 : {
319 MIS 0 : *ec = err ? make_err(err) : std::error_code{};
320 0 : op.cont_op.cont.h = h;
321 0 : return dispatch_coro(ex, op.cont_op.cont);
322 : }
323 HIT 4 : op.reset();
324 4 : op.h = h;
325 4 : op.ex = ex;
326 4 : op.ec_out = ec;
327 4 : op.fd = this->fd_;
328 4 : op.target_endpoint = ep;
329 4 : op.start(token, static_cast<Derived*>(this));
330 4 : op.impl_ptr = this->shared_from_this();
331 4 : op.complete(err, 0);
332 4 : this->svc_.post(&op);
333 4 : return std::noop_coroutine();
334 : }
335 :
336 : // EINPROGRESS — register with reactor
337 7356 : op.reset();
338 7356 : op.h = h;
339 7356 : op.ex = ex;
340 7356 : op.ec_out = ec;
341 7356 : op.fd = this->fd_;
342 7356 : op.target_endpoint = ep;
343 7356 : op.start(token, static_cast<Derived*>(this));
344 7356 : op.impl_ptr = this->shared_from_this();
345 :
346 7356 : this->register_op(
347 7356 : op, this->desc_state_.connect_op, this->desc_state_.write_ready,
348 7356 : this->desc_state_.connect_cancel_pending, true);
349 7356 : return std::noop_coroutine();
350 : }
351 :
352 : template<
353 : class Derived,
354 : class Service,
355 : class ConnOp,
356 : class ReadOp,
357 : class WriteOp,
358 : class DescState,
359 : class ImplBase,
360 : class Endpoint>
361 : std::coroutine_handle<>
362 217409 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
363 : do_read_some(
364 : std::coroutine_handle<> h,
365 : capy::executor_ref ex,
366 : buffer_param param,
367 : std::stop_token const& token,
368 : std::error_code* ec,
369 : std::size_t* bytes_out)
370 : {
371 217409 : auto& op = rd_;
372 217409 : op.reset();
373 :
374 217409 : capy::mutable_buffer bufs[ReadOp::max_buffers];
375 217409 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
376 :
377 217409 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
378 : {
379 2 : op.empty_buffer_read = true;
380 2 : op.h = h;
381 2 : op.ex = ex;
382 2 : op.ec_out = ec;
383 2 : op.bytes_out = bytes_out;
384 2 : op.start(token, static_cast<Derived*>(this));
385 2 : op.impl_ptr = this->shared_from_this();
386 2 : op.complete(0, 0);
387 2 : this->svc_.post(&op);
388 2 : return std::noop_coroutine();
389 : }
390 :
391 434814 : for (int i = 0; i < op.iovec_count; ++i)
392 : {
393 217407 : op.iovecs[i].iov_base = bufs[i].data();
394 217407 : op.iovecs[i].iov_len = bufs[i].size();
395 : }
396 :
397 : // Speculative read
398 : ssize_t n;
399 : do
400 : {
401 217407 : n = ::readv(this->fd_, op.iovecs, op.iovec_count);
402 : }
403 217407 : while (n < 0 && errno == EINTR);
404 :
405 217407 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
406 : {
407 217018 : int err = (n < 0) ? errno : 0;
408 217018 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
409 :
410 217018 : if (this->svc_.scheduler().try_consume_inline_budget())
411 : {
412 173648 : if (err)
413 MIS 0 : *ec = make_err(err);
414 HIT 173648 : else if (n == 0)
415 10 : *ec = capy::error::eof;
416 : else
417 173638 : *ec = {};
418 173648 : *bytes_out = bytes;
419 173648 : op.cont_op.cont.h = h;
420 173648 : return dispatch_coro(ex, op.cont_op.cont);
421 : }
422 43370 : op.h = h;
423 43370 : op.ex = ex;
424 43370 : op.ec_out = ec;
425 43370 : op.bytes_out = bytes_out;
426 43370 : op.start(token, static_cast<Derived*>(this));
427 43370 : op.impl_ptr = this->shared_from_this();
428 43370 : op.complete(err, bytes);
429 43370 : this->svc_.post(&op);
430 43370 : return std::noop_coroutine();
431 : }
432 :
433 : // EAGAIN — register with reactor
434 389 : op.h = h;
435 389 : op.ex = ex;
436 389 : op.ec_out = ec;
437 389 : op.bytes_out = bytes_out;
438 389 : op.fd = this->fd_;
439 389 : op.start(token, static_cast<Derived*>(this));
440 389 : op.impl_ptr = this->shared_from_this();
441 :
442 389 : this->register_op(
443 389 : op, this->desc_state_.read_op, this->desc_state_.read_ready,
444 389 : this->desc_state_.read_cancel_pending);
445 389 : return std::noop_coroutine();
446 : }
447 :
448 : template<
449 : class Derived,
450 : class Service,
451 : class ConnOp,
452 : class ReadOp,
453 : class WriteOp,
454 : class DescState,
455 : class ImplBase,
456 : class Endpoint>
457 : std::coroutine_handle<>
458 217112 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState, ImplBase, Endpoint>::
459 : do_write_some(
460 : std::coroutine_handle<> h,
461 : capy::executor_ref ex,
462 : buffer_param param,
463 : std::stop_token const& token,
464 : std::error_code* ec,
465 : std::size_t* bytes_out)
466 : {
467 217112 : auto& op = wr_;
468 217112 : op.reset();
469 :
470 217112 : capy::mutable_buffer bufs[WriteOp::max_buffers];
471 217112 : op.iovec_count =
472 217112 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
473 :
474 217112 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
475 : {
476 2 : op.h = h;
477 2 : op.ex = ex;
478 2 : op.ec_out = ec;
479 2 : op.bytes_out = bytes_out;
480 2 : op.start(token, static_cast<Derived*>(this));
481 2 : op.impl_ptr = this->shared_from_this();
482 2 : op.complete(0, 0);
483 2 : this->svc_.post(&op);
484 2 : return std::noop_coroutine();
485 : }
486 :
487 434220 : for (int i = 0; i < op.iovec_count; ++i)
488 : {
489 217110 : op.iovecs[i].iov_base = bufs[i].data();
490 217110 : op.iovecs[i].iov_len = bufs[i].size();
491 : }
492 :
493 : // Speculative write via backend-specific write policy
494 : ssize_t n =
495 217110 : WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
496 :
497 217110 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
498 : {
499 217110 : int err = (n < 0) ? errno : 0;
500 217110 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
501 :
502 217110 : if (this->svc_.scheduler().try_consume_inline_budget())
503 : {
504 173702 : *ec = err ? make_err(err) : std::error_code{};
505 173702 : *bytes_out = bytes;
506 173702 : op.cont_op.cont.h = h;
507 173702 : return dispatch_coro(ex, op.cont_op.cont);
508 : }
509 43408 : op.h = h;
510 43408 : op.ex = ex;
511 43408 : op.ec_out = ec;
512 43408 : op.bytes_out = bytes_out;
513 43408 : op.start(token, static_cast<Derived*>(this));
514 43408 : op.impl_ptr = this->shared_from_this();
515 43408 : op.complete(err, bytes);
516 43408 : this->svc_.post(&op);
517 43408 : return std::noop_coroutine();
518 : }
519 :
520 : // EAGAIN — register with reactor
521 MIS 0 : op.h = h;
522 0 : op.ex = ex;
523 0 : op.ec_out = ec;
524 0 : op.bytes_out = bytes_out;
525 0 : op.fd = this->fd_;
526 0 : op.start(token, static_cast<Derived*>(this));
527 0 : op.impl_ptr = this->shared_from_this();
528 :
529 0 : this->register_op(
530 0 : op, this->desc_state_.write_op, this->desc_state_.write_ready,
531 0 : this->desc_state_.write_cancel_pending, true);
532 0 : return std::noop_coroutine();
533 : }
534 :
535 : } // namespace boost::corosio::detail
536 :
537 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
|