include/boost/corosio/local_stream_socket.hpp

87.5% Lines (14/16) 85.7% List of functions (6/7)
local_stream_socket.hpp
f(x) Functions (7)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/platform.hpp>
15 #include <boost/corosio/detail/except.hpp>
16 #include <boost/corosio/detail/native_handle.hpp>
17 #include <boost/corosio/detail/op_base.hpp>
18 #include <boost/corosio/io/io_stream.hpp>
19 #include <boost/capy/io_result.hpp>
20 #include <boost/corosio/detail/buffer_param.hpp>
21 #include <boost/corosio/local_endpoint.hpp>
22 #include <boost/corosio/local_stream.hpp>
23 #include <boost/corosio/shutdown_type.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/ex/execution_context.hpp>
26 #include <boost/capy/ex/io_env.hpp>
27 #include <boost/capy/concept/executor.hpp>
28
29 #include <system_error>
30
31 #include <concepts>
32 #include <coroutine>
33 #include <cstddef>
34 #include <stop_token>
35 #include <type_traits>
36
37 namespace boost::corosio {
38
39 /** An asynchronous Unix stream socket for coroutine I/O.
40
41 This class provides asynchronous Unix domain stream socket
42 operations that return awaitable types. Each operation
43 participates in the affine awaitable protocol, ensuring
44 coroutines resume on the correct executor.
45
46 The socket must be opened before performing I/O operations.
47 Operations support cancellation through `std::stop_token` via
48 the affine protocol, or explicitly through the `cancel()`
49 member function.
50
51 @par Thread Safety
52 Distinct objects: Safe.@n
53 Shared objects: Unsafe. A socket must not have concurrent
54 operations of the same type (e.g., two simultaneous reads).
55 One read and one write may be in flight simultaneously.
56
57 @par Semantics
58 Wraps the platform Unix domain socket stack. Operations
59 dispatch to OS socket APIs via the io_context backend
60 (epoll, kqueue, select, or IOCP). Satisfies @ref capy::Stream.
61
62 @par Example
63 @code
64 io_context ioc;
65 local_stream_socket s(ioc);
66 s.open();
67
68 auto [ec] = co_await s.connect(local_endpoint("/tmp/my.sock"));
69 if (ec)
70 co_return;
71
72 char buf[1024];
73 auto [read_ec, n] = co_await s.read_some(
74 capy::mutable_buffer(buf, sizeof(buf)));
75 @endcode
76 */
77 class BOOST_COROSIO_DECL local_stream_socket : public io_stream
78 {
79 public:
80 using shutdown_type = corosio::shutdown_type;
81 using enum corosio::shutdown_type;
82
83 /** Define backend hooks for local stream socket operations.
84
85 Platform backends (epoll, kqueue, select) derive from this
86 to implement socket I/O, connection, and option management.
87 */
88 struct implementation : io_stream::implementation
89 {
90 /** Initiate an asynchronous connect to the given endpoint.
91
92 @param h Coroutine handle to resume on completion.
93 @param ex Executor for dispatching the completion.
94 @param ep The local endpoint (path) to connect to.
95 @param token Stop token for cancellation.
96 @param ec Output error code.
97
98 @return Coroutine handle to resume immediately.
99 */
100 virtual std::coroutine_handle<> connect(
101 std::coroutine_handle<> h,
102 capy::executor_ref ex,
103 corosio::local_endpoint ep,
104 std::stop_token token,
105 std::error_code* ec) = 0;
106
107 /** Shut down the socket for the given direction(s).
108
109 @param what The shutdown direction.
110
111 @return Error code on failure, empty on success.
112 */
113 virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
114
115 /// Return the platform socket descriptor.
116 virtual native_handle_type native_handle() const noexcept = 0;
117
118 /** Release ownership of the native socket handle.
119
120 Deregisters the socket from the reactor without closing
121 the descriptor. The caller takes ownership.
122
123 @return The native handle.
124 */
125 virtual native_handle_type release_socket() noexcept = 0;
126
127 /** Request cancellation of pending asynchronous operations.
128
129 All outstanding operations complete with operation_canceled error.
130 Check `ec == cond::canceled` for portable comparison.
131 */
132 virtual void cancel() noexcept = 0;
133
134 /** Set a socket option.
135
136 @param level The protocol level (e.g. `SOL_SOCKET`).
137 @param optname The option name (e.g. `SO_KEEPALIVE`).
138 @param data Pointer to the option value.
139 @param size Size of the option value in bytes.
140 @return Error code on failure, empty on success.
141 */
142 virtual std::error_code set_option(
143 int level,
144 int optname,
145 void const* data,
146 std::size_t size) noexcept = 0;
147
148 /** Get a socket option.
149
150 @param level The protocol level (e.g. `SOL_SOCKET`).
151 @param optname The option name (e.g. `SO_KEEPALIVE`).
152 @param data Pointer to receive the option value.
153 @param size On entry, the size of the buffer. On exit,
154 the size of the option value.
155 @return Error code on failure, empty on success.
156 */
157 virtual std::error_code
158 get_option(int level, int optname, void* data, std::size_t* size)
159 const noexcept = 0;
160
161 /// Return the cached local endpoint.
162 virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
163
164 /// Return the cached remote endpoint.
165 virtual corosio::local_endpoint remote_endpoint() const noexcept = 0;
166 };
167
168 /// Represent the awaitable returned by @ref connect.
169 struct connect_awaitable
170 : detail::void_op_base<connect_awaitable>
171 {
172 local_stream_socket& s_;
173 corosio::local_endpoint endpoint_;
174
175 4x connect_awaitable(
176 local_stream_socket& s, corosio::local_endpoint ep) noexcept
177 4x : s_(s), endpoint_(ep) {}
178
179 4x std::coroutine_handle<> dispatch(
180 std::coroutine_handle<> h, capy::executor_ref ex) const
181 {
182 4x return s_.get().connect(h, ex, endpoint_, token_, &ec_);
183 }
184 };
185
186 public:
187 /** Destructor.
188
189 Closes the socket if open, cancelling any pending operations.
190 */
191 ~local_stream_socket() override;
192
193 /** Construct a socket from an execution context.
194
195 @param ctx The execution context that will own this socket.
196 */
197 explicit local_stream_socket(capy::execution_context& ctx);
198
199 /** Construct a socket from an executor.
200
201 The socket is associated with the executor's context.
202
203 @param ex The executor whose context will own the socket.
204 */
205 template<class Ex>
206 requires(!std::same_as<std::remove_cvref_t<Ex>, local_stream_socket>) &&
207 capy::Executor<Ex>
208 explicit local_stream_socket(Ex const& ex) : local_stream_socket(ex.context())
209 {
210 }
211
212 /** Move constructor.
213
214 Transfers ownership of the socket resources.
215
216 @param other The socket to move from.
217
218 @pre No awaitables returned by @p other's methods exist.
219 @pre The execution context associated with @p other must
220 outlive this socket.
221 */
222 22x local_stream_socket(local_stream_socket&& other) noexcept
223 22x : io_object(std::move(other))
224 {
225 22x }
226
227 /** Move assignment operator.
228
229 Closes any existing socket and transfers ownership.
230
231 @param other The socket to move from.
232
233 @pre No awaitables returned by either `*this` or @p other's
234 methods exist.
235 @pre The execution context associated with @p other must
236 outlive this socket.
237
238 @return Reference to this socket.
239 */
240 local_stream_socket& operator=(local_stream_socket&& other) noexcept
241 {
242 if (this != &other)
243 {
244 close();
245 io_object::operator=(std::move(other));
246 }
247 return *this;
248 }
249
250 local_stream_socket(local_stream_socket const&) = delete;
251 local_stream_socket& operator=(local_stream_socket const&) = delete;
252
253 /** Open the socket.
254
255 Creates a Unix stream socket and associates it with
256 the platform reactor.
257
258 @param proto The protocol. Defaults to local_stream{}.
259
260 @throws std::system_error on failure.
261 */
262 void open(local_stream proto = {});
263
264 /** Close the socket.
265
266 Releases socket resources. Any pending operations complete
267 with `errc::operation_canceled`.
268 */
269 void close();
270
271 /** Check if the socket is open.
272
273 @return `true` if the socket is open and ready for operations.
274 */
275 118x bool is_open() const noexcept
276 {
277 #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
278 return h_ && get().native_handle() != ~native_handle_type(0);
279 #else
280 118x return h_ && get().native_handle() >= 0;
281 #endif
282 }
283
284 /** Initiate an asynchronous connect operation.
285
286 If the socket is not already open, it is opened automatically.
287
288 @param ep The local endpoint (path) to connect to.
289
290 @return An awaitable that completes with io_result<>.
291
292 @throws std::system_error if the socket needs to be opened
293 and the open fails.
294 */
295 4x auto connect(corosio::local_endpoint ep)
296 {
297 4x if (!is_open())
298 open();
299 4x return connect_awaitable(*this, ep);
300 }
301
302 /** Cancel any pending asynchronous operations.
303
304 All outstanding operations complete with `errc::operation_canceled`.
305 Check `ec == cond::canceled` for portable comparison.
306 */
307 void cancel();
308
309 /** Get the native socket handle.
310
311 Returns the underlying platform-specific socket descriptor.
312 On POSIX systems this is an `int` file descriptor.
313
314 @return The native socket handle, or an invalid sentinel
315 if not open.
316 */
317 native_handle_type native_handle() const noexcept;
318
319 /** Query the number of bytes available for reading.
320
321 @return The number of bytes that can be read without blocking.
322
323 @throws std::logic_error if the socket is not open.
324 @throws std::system_error on ioctl failure.
325 */
326 std::size_t available() const;
327
328 /** Release ownership of the native socket handle.
329
330 Deregisters the socket from the backend and cancels pending
331 operations without closing the descriptor. The caller takes
332 ownership of the returned handle.
333
334 @return The native handle.
335
336 @throws std::logic_error if the socket is not open.
337
338 @post is_open() == false
339 */
340 native_handle_type release();
341
342 /** Disable sends or receives on the socket.
343
344 Unix stream connections are full-duplex: each direction
345 (send and receive) operates independently. This function
346 allows you to close one or both directions without
347 destroying the socket.
348
349 @param what Determines what operations will no longer
350 be allowed.
351
352 @throws std::system_error on failure.
353 */
354 void shutdown(shutdown_type what);
355
356 /** Shut down part or all of the socket (non-throwing).
357
358 @param what Which direction to shut down.
359 @param ec Set to the error code on failure.
360 */
361 void shutdown(shutdown_type what, std::error_code& ec) noexcept;
362
363 /** Set a socket option.
364
365 Applies a type-safe socket option to the underlying socket.
366 The option type encodes the protocol level and option name.
367
368 @param opt The option to set.
369
370 @throws std::logic_error if the socket is not open.
371 @throws std::system_error on failure.
372 */
373 template<class Option>
374 void set_option(Option const& opt)
375 {
376 if (!is_open())
377 detail::throw_logic_error("set_option: socket not open");
378 std::error_code ec = get().set_option(
379 Option::level(), Option::name(), opt.data(), opt.size());
380 if (ec)
381 detail::throw_system_error(ec, "local_stream_socket::set_option");
382 }
383
384 /** Get a socket option.
385
386 Retrieves the current value of a type-safe socket option.
387
388 @return The current option value.
389
390 @throws std::logic_error if the socket is not open.
391 @throws std::system_error on failure.
392 */
393 template<class Option>
394 Option get_option() const
395 {
396 if (!is_open())
397 detail::throw_logic_error("get_option: socket not open");
398 Option opt{};
399 std::size_t sz = opt.size();
400 std::error_code ec =
401 get().get_option(Option::level(), Option::name(), opt.data(), &sz);
402 if (ec)
403 detail::throw_system_error(ec, "local_stream_socket::get_option");
404 opt.resize(sz);
405 return opt;
406 }
407
408 /** Assign an existing file descriptor to this socket.
409
410 The socket must not already be open. The fd is adopted
411 and registered with the platform reactor. Used by
412 make_local_stream_pair() to wrap socketpair() fds.
413
414 @param fd The file descriptor to adopt. Must be a valid,
415 open, non-blocking Unix stream socket.
416
417 @throws std::system_error on failure.
418 */
419 void assign(native_handle_type fd);
420
421 /** Get the local endpoint of the socket.
422
423 Returns the local address (path) to which the socket is bound.
424 The endpoint is cached when the connection is established.
425
426 @return The local endpoint, or a default endpoint if the socket
427 is not connected.
428 */
429 corosio::local_endpoint local_endpoint() const noexcept;
430
431 /** Get the remote endpoint of the socket.
432
433 Returns the remote address (path) to which the socket is connected.
434 The endpoint is cached when the connection is established.
435
436 @return The remote endpoint, or a default endpoint if the socket
437 is not connected.
438 */
439 corosio::local_endpoint remote_endpoint() const noexcept;
440
441 protected:
442 local_stream_socket() noexcept = default;
443
444 explicit local_stream_socket(handle h) noexcept : io_object(std::move(h)) {}
445
446 private:
447 friend class local_stream_acceptor;
448
449 void open_for_family(int family, int type, int protocol);
450
451 104x inline implementation& get() const noexcept
452 {
453 104x return *static_cast<implementation*>(h_.get());
454 }
455 };
456
457 } // namespace boost::corosio
458
459 #endif // BOOST_COROSIO_LOCAL_STREAM_SOCKET_HPP
460