TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 : #include <boost/corosio/native/detail/make_err.hpp>
16 : #include <boost/corosio/io/io_object.hpp>
17 :
18 : #include <coroutine>
19 : #include <mutex>
20 : #include <utility>
21 :
22 : #include <netinet/in.h>
23 : #include <sys/socket.h>
24 : #include <unistd.h>
25 :
26 : namespace boost::corosio::detail {
27 :
28 : /** Complete a base read/write operation.
29 :
30 : Translates the recorded errno and cancellation state into
31 : an error_code, stores the byte count, then resumes the
32 : caller via symmetric transfer.
33 :
34 : @tparam Op The concrete operation type.
35 : @param op The operation to complete.
36 : */
37 : template<typename Op>
38 : void
39 HIT 87199 : complete_io_op(Op& op)
40 : {
41 87199 : op.stop_cb.reset();
42 87199 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43 :
44 87199 : if (op.cancelled.load(std::memory_order_acquire))
45 307 : *op.ec_out = capy::error::canceled;
46 86892 : else if (op.errn != 0)
47 MIS 0 : *op.ec_out = make_err(op.errn);
48 HIT 86892 : else if (op.is_read_operation() && op.bytes_transferred == 0)
49 MIS 0 : *op.ec_out = capy::error::eof;
50 : else
51 HIT 86892 : *op.ec_out = {};
52 :
53 87199 : *op.bytes_out = op.bytes_transferred;
54 :
55 87199 : op.cont_op.cont.h = op.h;
56 87199 : capy::executor_ref saved_ex(op.ex);
57 87199 : auto prevent = std::move(op.impl_ptr);
58 87199 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
59 87199 : }
60 :
61 : /** Complete a datagram recv operation (connected mode).
62 :
63 : Like complete_io_op but does not translate zero bytes into
64 : EOF. Zero-length datagrams are valid and should be reported
65 : as success with 0 bytes transferred.
66 :
67 : @param op The operation to complete.
68 : */
69 : template<typename Op>
70 : void
71 : complete_dgram_recv_op(Op& op)
72 : {
73 : op.stop_cb.reset();
74 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
75 :
76 : if (op.cancelled.load(std::memory_order_acquire))
77 : *op.ec_out = capy::error::canceled;
78 : else if (op.errn != 0)
79 : *op.ec_out = make_err(op.errn);
80 : else
81 : *op.ec_out = {};
82 :
83 : *op.bytes_out = op.bytes_transferred;
84 :
85 : op.cont_op.cont.h = op.h;
86 : capy::executor_ref saved_ex(op.ex);
87 : auto prevent = std::move(op.impl_ptr);
88 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
89 : }
90 :
91 : /** Complete a connect operation with endpoint caching.
92 :
93 : On success, queries the local endpoint via getsockname and
94 : caches both endpoints in the socket impl. Then resumes the
95 : caller via symmetric transfer.
96 :
97 : @tparam Op The concrete connect operation type.
98 : @param op The operation to complete.
99 : */
100 : template<typename Op>
101 : void
102 7370 : complete_connect_op(Op& op)
103 : {
104 7370 : op.stop_cb.reset();
105 7370 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
106 :
107 7370 : bool success =
108 7370 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
109 :
110 7370 : if (success && op.socket_impl_)
111 : {
112 : using ep_type = decltype(op.target_endpoint);
113 7365 : ep_type local_ep;
114 7365 : sockaddr_storage local_storage{};
115 7365 : socklen_t local_len = sizeof(local_storage);
116 7365 : if (::getsockname(
117 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
118 7365 : &local_len) == 0)
119 7361 : local_ep =
120 7365 : from_sockaddr_as(local_storage, local_len, ep_type{});
121 7365 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
122 : }
123 :
124 7370 : if (op.cancelled.load(std::memory_order_acquire))
125 MIS 0 : *op.ec_out = capy::error::canceled;
126 HIT 7370 : else if (op.errn != 0)
127 5 : *op.ec_out = make_err(op.errn);
128 : else
129 7365 : *op.ec_out = {};
130 :
131 7370 : op.cont_op.cont.h = op.h;
132 7370 : capy::executor_ref saved_ex(op.ex);
133 7370 : auto prevent = std::move(op.impl_ptr);
134 7370 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
135 7370 : }
136 :
137 : /** Construct and register a peer socket from an accepted fd.
138 :
139 : Creates a new socket impl via the acceptor's associated
140 : socket service, registers it with the scheduler, and caches
141 : the local and remote endpoints.
142 :
143 : @tparam SocketImpl The concrete socket implementation type.
144 : @tparam AcceptorImpl The concrete acceptor implementation type.
145 : @param acceptor_impl The acceptor that accepted the connection.
146 : @param accepted_fd The accepted file descriptor (set to -1 on success).
147 : @param peer_storage The peer address from accept().
148 : @param impl_out Output pointer for the new socket impl.
149 : @param ec_out Output pointer for any error.
150 : @return True on success, false on failure.
151 : */
152 : template<typename SocketImpl, typename AcceptorImpl>
153 : bool
154 7355 : setup_accepted_socket(
155 : AcceptorImpl* acceptor_impl,
156 : int& accepted_fd,
157 : sockaddr_storage const& peer_storage,
158 : socklen_t peer_addrlen,
159 : io_object::implementation** impl_out,
160 : std::error_code* ec_out)
161 : {
162 7355 : auto* socket_svc = acceptor_impl->service().stream_service();
163 7355 : if (!socket_svc)
164 : {
165 MIS 0 : *ec_out = make_err(ENOENT);
166 0 : return false;
167 : }
168 :
169 HIT 7355 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
170 7355 : impl.set_socket(accepted_fd);
171 :
172 7355 : impl.desc_state_.fd = accepted_fd;
173 : {
174 7355 : std::lock_guard lock(impl.desc_state_.mutex);
175 7355 : impl.desc_state_.read_op = nullptr;
176 7355 : impl.desc_state_.write_op = nullptr;
177 7355 : impl.desc_state_.connect_op = nullptr;
178 7355 : }
179 7355 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
180 :
181 : using ep_type = decltype(acceptor_impl->local_endpoint());
182 7355 : impl.set_endpoints(
183 : acceptor_impl->local_endpoint(),
184 7355 : from_sockaddr_as(
185 : peer_storage,
186 : peer_addrlen,
187 : ep_type{}));
188 :
189 7355 : if (impl_out)
190 7355 : *impl_out = &impl;
191 7355 : accepted_fd = -1;
192 7355 : return true;
193 : }
194 :
195 : /** Complete an accept operation.
196 :
197 : Sets up the peer socket on success, or closes the accepted
198 : fd on failure. Then resumes the caller via symmetric transfer.
199 :
200 : @tparam SocketImpl The concrete socket implementation type.
201 : @tparam Op The concrete accept operation type.
202 : @param op The operation to complete.
203 : */
204 : template<typename SocketImpl, typename Op>
205 : void
206 7367 : complete_accept_op(Op& op)
207 : {
208 7367 : op.stop_cb.reset();
209 7367 : op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
210 :
211 7367 : bool success =
212 7367 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
213 :
214 7367 : if (op.cancelled.load(std::memory_order_acquire))
215 12 : *op.ec_out = capy::error::canceled;
216 7355 : else if (op.errn != 0)
217 MIS 0 : *op.ec_out = make_err(op.errn);
218 : else
219 HIT 7355 : *op.ec_out = {};
220 :
221 7367 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
222 : {
223 7355 : if (!setup_accepted_socket<SocketImpl>(
224 7355 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
225 : op.peer_addrlen, op.impl_out, op.ec_out))
226 MIS 0 : success = false;
227 : }
228 :
229 HIT 7367 : if (!success || !op.acceptor_impl_)
230 : {
231 12 : if (op.accepted_fd >= 0)
232 : {
233 MIS 0 : ::close(op.accepted_fd);
234 0 : op.accepted_fd = -1;
235 : }
236 HIT 12 : if (op.impl_out)
237 12 : *op.impl_out = nullptr;
238 : }
239 :
240 7367 : op.cont_op.cont.h = op.h;
241 7367 : capy::executor_ref saved_ex(op.ex);
242 7367 : auto prevent = std::move(op.impl_ptr);
243 7367 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
244 7367 : }
245 :
246 : /** Complete a datagram operation (send_to or recv_from).
247 :
248 : For recv_from operations, writes the source endpoint from the
249 : recorded sockaddr_storage into the caller's endpoint pointer.
250 : Then resumes the caller via symmetric transfer.
251 :
252 : @tparam Op The concrete datagram operation type.
253 : @param op The operation to complete.
254 : */
255 : template<typename Op>
256 : void
257 6 : complete_datagram_op(Op& op)
258 : {
259 6 : op.stop_cb.reset();
260 6 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
261 :
262 6 : if (op.cancelled.load(std::memory_order_acquire))
263 2 : *op.ec_out = capy::error::canceled;
264 4 : else if (op.errn != 0)
265 MIS 0 : *op.ec_out = make_err(op.errn);
266 : else
267 HIT 4 : *op.ec_out = {};
268 :
269 6 : *op.bytes_out = op.bytes_transferred;
270 :
271 6 : op.cont_op.cont.h = op.h;
272 6 : capy::executor_ref saved_ex(op.ex);
273 6 : auto prevent = std::move(op.impl_ptr);
274 6 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
275 6 : }
276 :
277 : /** Complete a datagram operation with source endpoint capture.
278 :
279 : For recv_from operations, writes the source endpoint from the
280 : recorded sockaddr_storage into the caller's endpoint pointer.
281 : Then resumes the caller via symmetric transfer.
282 :
283 : @tparam Op The concrete datagram operation type.
284 : @param op The operation to complete.
285 : @param source_out Optional pointer to store source endpoint
286 : (non-null for recv_from, null for send_to).
287 : */
288 : template<typename Op, typename Endpoint>
289 : void
290 18 : complete_datagram_op(Op& op, Endpoint* source_out)
291 : {
292 18 : op.stop_cb.reset();
293 18 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
294 :
295 18 : if (op.cancelled.load(std::memory_order_acquire))
296 6 : *op.ec_out = capy::error::canceled;
297 12 : else if (op.errn != 0)
298 MIS 0 : *op.ec_out = make_err(op.errn);
299 : else
300 HIT 12 : *op.ec_out = {};
301 :
302 18 : *op.bytes_out = op.bytes_transferred;
303 :
304 28 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
305 10 : op.errn == 0)
306 20 : *source_out = from_sockaddr_as(
307 10 : op.source_storage,
308 : op.source_addrlen,
309 : Endpoint{});
310 :
311 18 : op.cont_op.cont.h = op.h;
312 18 : capy::executor_ref saved_ex(op.ex);
313 18 : auto prevent = std::move(op.impl_ptr);
314 18 : dispatch_coro(saved_ex, op.cont_op.cont).resume();
315 18 : }
316 :
317 : } // namespace boost::corosio::detail
318 :
319 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|