TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_UDP_SOCKET_HPP
11 : #define BOOST_COROSIO_UDP_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/platform.hpp>
15 : #include <boost/corosio/detail/except.hpp>
16 : #include <boost/corosio/detail/native_handle.hpp>
17 : #include <boost/corosio/detail/op_base.hpp>
18 : #include <boost/corosio/io/io_object.hpp>
19 : #include <boost/capy/io_result.hpp>
20 : #include <boost/corosio/detail/buffer_param.hpp>
21 : #include <boost/corosio/endpoint.hpp>
22 : #include <boost/corosio/message_flags.hpp>
23 : #include <boost/corosio/udp.hpp>
24 : #include <boost/capy/ex/executor_ref.hpp>
25 : #include <boost/capy/ex/execution_context.hpp>
26 : #include <boost/capy/ex/io_env.hpp>
27 : #include <boost/capy/concept/executor.hpp>
28 :
29 : #include <system_error>
30 :
31 : #include <concepts>
32 : #include <coroutine>
33 : #include <cstddef>
34 : #include <stop_token>
35 : #include <type_traits>
36 :
37 : namespace boost::corosio {
38 :
39 : /** An asynchronous UDP socket for coroutine I/O.
40 :
41 : This class provides asynchronous UDP datagram operations that
42 : return awaitable types. Each operation participates in the affine
43 : awaitable protocol, ensuring coroutines resume on the correct
44 : executor.
45 :
46 : Supports two modes of operation:
47 :
48 : **Connectionless mode**: each `send_to` specifies a destination
49 : endpoint, and each `recv_from` captures the source endpoint.
50 : The socket must be opened (and optionally bound) before I/O.
51 :
52 : **Connected mode**: call `connect()` to set a default peer,
53 : then use `send()`/`recv()` without endpoint arguments.
54 : The kernel filters incoming datagrams to those from the
55 : connected peer.
56 :
57 : @par Thread Safety
58 : Distinct objects: Safe.@n
59 : Shared objects: Unsafe. A socket must not have concurrent
60 : operations of the same type (e.g., two simultaneous recv_from).
61 : One send_to and one recv_from may be in flight simultaneously.
62 :
63 : @par Example
64 : @code
65 : // Connectionless mode
66 : io_context ioc;
67 : udp_socket sock( ioc );
68 : sock.open( udp::v4() );
69 : sock.bind( endpoint( ipv4_address::any(), 9000 ) );
70 :
71 : char buf[1024];
72 : endpoint sender;
73 : auto [ec, n] = co_await sock.recv_from(
74 : capy::mutable_buffer( buf, sizeof( buf ) ), sender );
75 : if ( !ec )
76 : co_await sock.send_to(
77 : capy::const_buffer( buf, n ), sender );
78 :
79 : // Connected mode
80 : udp_socket csock( ioc );
81 : auto [cec] = co_await csock.connect(
82 : endpoint( ipv4_address::loopback(), 9000 ) );
83 : if ( !cec )
84 : co_await csock.send(
85 : capy::const_buffer( buf, n ) );
86 : @endcode
87 : */
88 : class BOOST_COROSIO_DECL udp_socket : public io_object
89 : {
90 : public:
91 : /** Define backend hooks for UDP socket operations.
92 :
93 : Platform backends (epoll, kqueue, select) derive from
94 : this to implement datagram I/O and option management.
95 : */
96 : struct implementation : io_object::implementation
97 : {
98 : /** Initiate an asynchronous send_to operation.
99 :
100 : @param h Coroutine handle to resume on completion.
101 : @param ex Executor for dispatching the completion.
102 : @param buf The buffer data to send.
103 : @param dest The destination endpoint.
104 : @param flags Platform message flags (e.g. `MSG_DONTWAIT`).
105 : @param token Stop token for cancellation.
106 : @param ec Output error code.
107 : @param bytes_out Output bytes transferred.
108 :
109 : @return Coroutine handle to resume immediately.
110 : */
111 : virtual std::coroutine_handle<> send_to(
112 : std::coroutine_handle<> h,
113 : capy::executor_ref ex,
114 : buffer_param buf,
115 : endpoint dest,
116 : int flags,
117 : std::stop_token token,
118 : std::error_code* ec,
119 : std::size_t* bytes_out) = 0;
120 :
121 : /** Initiate an asynchronous recv_from operation.
122 :
123 : @param h Coroutine handle to resume on completion.
124 : @param ex Executor for dispatching the completion.
125 : @param buf The buffer to receive into.
126 : @param source Output endpoint for the sender's address.
127 : @param flags Platform message flags (e.g. `MSG_PEEK`).
128 : @param token Stop token for cancellation.
129 : @param ec Output error code.
130 : @param bytes_out Output bytes transferred.
131 :
132 : @return Coroutine handle to resume immediately.
133 : */
134 : virtual std::coroutine_handle<> recv_from(
135 : std::coroutine_handle<> h,
136 : capy::executor_ref ex,
137 : buffer_param buf,
138 : endpoint* source,
139 : int flags,
140 : std::stop_token token,
141 : std::error_code* ec,
142 : std::size_t* bytes_out) = 0;
143 :
144 : /// Return the platform socket descriptor.
145 : virtual native_handle_type native_handle() const noexcept = 0;
146 :
147 : /** Request cancellation of pending asynchronous operations.
148 :
149 : All outstanding operations complete with operation_canceled
150 : error. Check `ec == cond::canceled` for portable comparison.
151 : */
152 : virtual void cancel() noexcept = 0;
153 :
154 : /** Set a socket option.
155 :
156 : @param level The protocol level (e.g. `SOL_SOCKET`).
157 : @param optname The option name.
158 : @param data Pointer to the option value.
159 : @param size Size of the option value in bytes.
160 : @return Error code on failure, empty on success.
161 : */
162 : virtual std::error_code set_option(
163 : int level,
164 : int optname,
165 : void const* data,
166 : std::size_t size) noexcept = 0;
167 :
168 : /** Get a socket option.
169 :
170 : @param level The protocol level (e.g. `SOL_SOCKET`).
171 : @param optname The option name.
172 : @param data Pointer to receive the option value.
173 : @param size On entry, the size of the buffer. On exit,
174 : the size of the option value.
175 : @return Error code on failure, empty on success.
176 : */
177 : virtual std::error_code
178 : get_option(int level, int optname, void* data, std::size_t* size)
179 : const noexcept = 0;
180 :
181 : /// Return the cached local endpoint.
182 : virtual endpoint local_endpoint() const noexcept = 0;
183 :
184 : /// Return the cached remote endpoint (connected mode).
185 : virtual endpoint remote_endpoint() const noexcept = 0;
186 :
187 : /** Initiate an asynchronous connect to set the default peer.
188 :
189 : @param h Coroutine handle to resume on completion.
190 : @param ex Executor for dispatching the completion.
191 : @param ep The remote endpoint to connect to.
192 : @param token Stop token for cancellation.
193 : @param ec Output error code.
194 :
195 : @return Coroutine handle to resume immediately.
196 : */
197 : virtual std::coroutine_handle<> connect(
198 : std::coroutine_handle<> h,
199 : capy::executor_ref ex,
200 : endpoint ep,
201 : std::stop_token token,
202 : std::error_code* ec) = 0;
203 :
204 : /** Initiate an asynchronous connected send operation.
205 :
206 : @param h Coroutine handle to resume on completion.
207 : @param ex Executor for dispatching the completion.
208 : @param buf The buffer data to send.
209 : @param flags Platform message flags (e.g. `MSG_DONTWAIT`).
210 : @param token Stop token for cancellation.
211 : @param ec Output error code.
212 : @param bytes_out Output bytes transferred.
213 :
214 : @return Coroutine handle to resume immediately.
215 : */
216 : virtual std::coroutine_handle<> send(
217 : std::coroutine_handle<> h,
218 : capy::executor_ref ex,
219 : buffer_param buf,
220 : int flags,
221 : std::stop_token token,
222 : std::error_code* ec,
223 : std::size_t* bytes_out) = 0;
224 :
225 : /** Initiate an asynchronous connected recv operation.
226 :
227 : @param h Coroutine handle to resume on completion.
228 : @param ex Executor for dispatching the completion.
229 : @param buf The buffer to receive into.
230 : @param flags Platform message flags (e.g. `MSG_PEEK`).
231 : @param token Stop token for cancellation.
232 : @param ec Output error code.
233 : @param bytes_out Output bytes transferred.
234 :
235 : @return Coroutine handle to resume immediately.
236 : */
237 : virtual std::coroutine_handle<> recv(
238 : std::coroutine_handle<> h,
239 : capy::executor_ref ex,
240 : buffer_param buf,
241 : int flags,
242 : std::stop_token token,
243 : std::error_code* ec,
244 : std::size_t* bytes_out) = 0;
245 : };
246 :
247 : /** Represent the awaitable returned by @ref send_to.
248 :
249 : Captures the destination endpoint and buffer, then dispatches
250 : to the backend implementation on suspension.
251 : */
252 : struct send_to_awaitable
253 : : detail::bytes_op_base<send_to_awaitable>
254 : {
255 : udp_socket& s_;
256 : buffer_param buf_;
257 : endpoint dest_;
258 : int flags_;
259 :
260 HIT 22 : send_to_awaitable(
261 : udp_socket& s, buffer_param buf,
262 : endpoint dest, int flags = 0) noexcept
263 22 : : s_(s), buf_(buf), dest_(dest), flags_(flags) {}
264 :
265 22 : std::coroutine_handle<> dispatch(
266 : std::coroutine_handle<> h, capy::executor_ref ex) const
267 : {
268 44 : return s_.get().send_to(
269 44 : h, ex, buf_, dest_, flags_, token_, &ec_, &bytes_);
270 : }
271 : };
272 :
273 : /** Represent the awaitable returned by @ref recv_from.
274 :
275 : Captures the source endpoint reference and buffer, then
276 : dispatches to the backend implementation on suspension.
277 : */
278 : struct recv_from_awaitable
279 : : detail::bytes_op_base<recv_from_awaitable>
280 : {
281 : udp_socket& s_;
282 : buffer_param buf_;
283 : endpoint& source_;
284 : int flags_;
285 :
286 32 : recv_from_awaitable(
287 : udp_socket& s, buffer_param buf,
288 : endpoint& source, int flags = 0) noexcept
289 32 : : s_(s), buf_(buf), source_(source), flags_(flags) {}
290 :
291 32 : std::coroutine_handle<> dispatch(
292 : std::coroutine_handle<> h, capy::executor_ref ex) const
293 : {
294 64 : return s_.get().recv_from(
295 64 : h, ex, buf_, &source_, flags_, token_, &ec_, &bytes_);
296 : }
297 : };
298 :
299 : /// Represent the awaitable returned by @ref connect.
300 : struct connect_awaitable
301 : : detail::void_op_base<connect_awaitable>
302 : {
303 : udp_socket& s_;
304 : endpoint endpoint_;
305 :
306 12 : connect_awaitable(udp_socket& s, endpoint ep) noexcept
307 12 : : s_(s), endpoint_(ep) {}
308 :
309 12 : std::coroutine_handle<> dispatch(
310 : std::coroutine_handle<> h, capy::executor_ref ex) const
311 : {
312 12 : return s_.get().connect(h, ex, endpoint_, token_, &ec_);
313 : }
314 : };
315 :
316 : /// Represent the awaitable returned by @ref send.
317 : struct send_awaitable
318 : : detail::bytes_op_base<send_awaitable>
319 : {
320 : udp_socket& s_;
321 : buffer_param buf_;
322 : int flags_;
323 :
324 6 : send_awaitable(
325 : udp_socket& s, buffer_param buf,
326 : int flags = 0) noexcept
327 6 : : s_(s), buf_(buf), flags_(flags) {}
328 :
329 6 : std::coroutine_handle<> dispatch(
330 : std::coroutine_handle<> h, capy::executor_ref ex) const
331 : {
332 12 : return s_.get().send(
333 12 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
334 : }
335 : };
336 :
337 : /// Represent the awaitable returned by @ref recv.
338 : struct recv_awaitable
339 : : detail::bytes_op_base<recv_awaitable>
340 : {
341 : udp_socket& s_;
342 : buffer_param buf_;
343 : int flags_;
344 :
345 4 : recv_awaitable(
346 : udp_socket& s, buffer_param buf,
347 : int flags = 0) noexcept
348 4 : : s_(s), buf_(buf), flags_(flags) {}
349 :
350 4 : std::coroutine_handle<> dispatch(
351 : std::coroutine_handle<> h, capy::executor_ref ex) const
352 : {
353 8 : return s_.get().recv(
354 8 : h, ex, buf_, flags_, token_, &ec_, &bytes_);
355 : }
356 : };
357 :
358 : public:
359 : /** Destructor.
360 :
361 : Closes the socket if open, cancelling any pending operations.
362 : */
363 : ~udp_socket() override;
364 :
365 : /** Construct a socket from an execution context.
366 :
367 : @param ctx The execution context that will own this socket.
368 : */
369 : explicit udp_socket(capy::execution_context& ctx);
370 :
371 : /** Construct a socket from an executor.
372 :
373 : The socket is associated with the executor's context.
374 :
375 : @param ex The executor whose context will own the socket.
376 : */
377 : template<class Ex>
378 : requires(!std::same_as<std::remove_cvref_t<Ex>, udp_socket>) &&
379 : capy::Executor<Ex>
380 : explicit udp_socket(Ex const& ex) : udp_socket(ex.context())
381 : {
382 : }
383 :
384 : /** Move constructor.
385 :
386 : Transfers ownership of the socket resources.
387 :
388 : @param other The socket to move from.
389 : */
390 2 : udp_socket(udp_socket&& other) noexcept : io_object(std::move(other)) {}
391 :
392 : /** Move assignment operator.
393 :
394 : Closes any existing socket and transfers ownership.
395 :
396 : @param other The socket to move from.
397 : @return Reference to this socket.
398 : */
399 2 : udp_socket& operator=(udp_socket&& other) noexcept
400 : {
401 2 : if (this != &other)
402 : {
403 2 : close();
404 2 : h_ = std::move(other.h_);
405 : }
406 2 : return *this;
407 : }
408 :
409 : udp_socket(udp_socket const&) = delete;
410 : udp_socket& operator=(udp_socket const&) = delete;
411 :
412 : /** Open the socket.
413 :
414 : Creates a UDP socket and associates it with the platform
415 : reactor.
416 :
417 : @param proto The protocol (IPv4 or IPv6). Defaults to
418 : `udp::v4()`.
419 :
420 : @throws std::system_error on failure.
421 : */
422 : void open(udp proto = udp::v4());
423 :
424 : /** Close the socket.
425 :
426 : Releases socket resources. Any pending operations complete
427 : with `errc::operation_canceled`.
428 : */
429 : void close();
430 :
431 : /** Check if the socket is open.
432 :
433 : @return `true` if the socket is open and ready for operations.
434 : */
435 444 : bool is_open() const noexcept
436 : {
437 : #if BOOST_COROSIO_HAS_IOCP && !defined(BOOST_COROSIO_MRDOCS)
438 : return h_ && get().native_handle() != ~native_handle_type(0);
439 : #else
440 444 : return h_ && get().native_handle() >= 0;
441 : #endif
442 : }
443 :
444 : /** Bind the socket to a local endpoint.
445 :
446 : Associates the socket with a local address and port.
447 : Required before calling `recv_from`.
448 :
449 : @param ep The local endpoint to bind to.
450 :
451 : @return Error code on failure, empty on success.
452 :
453 : @throws std::logic_error if the socket is not open.
454 : */
455 : [[nodiscard]] std::error_code bind(endpoint ep);
456 :
457 : /** Cancel any pending asynchronous operations.
458 :
459 : All outstanding operations complete with
460 : `errc::operation_canceled`. Check `ec == cond::canceled`
461 : for portable comparison.
462 : */
463 : void cancel();
464 :
465 : /** Get the native socket handle.
466 :
467 : @return The native socket handle, or -1 if not open.
468 : */
469 : native_handle_type native_handle() const noexcept;
470 :
471 : /** Set a socket option.
472 :
473 : @param opt The option to set.
474 :
475 : @throws std::logic_error if the socket is not open.
476 : @throws std::system_error on failure.
477 : */
478 : template<class Option>
479 20 : void set_option(Option const& opt)
480 : {
481 20 : if (!is_open())
482 MIS 0 : detail::throw_logic_error("set_option: socket not open");
483 HIT 20 : std::error_code ec = get().set_option(
484 : Option::level(), Option::name(), opt.data(), opt.size());
485 20 : if (ec)
486 MIS 0 : detail::throw_system_error(ec, "udp_socket::set_option");
487 HIT 20 : }
488 :
489 : /** Get a socket option.
490 :
491 : @return The current option value.
492 :
493 : @throws std::logic_error if the socket is not open.
494 : @throws std::system_error on failure.
495 : */
496 : template<class Option>
497 16 : Option get_option() const
498 : {
499 16 : if (!is_open())
500 MIS 0 : detail::throw_logic_error("get_option: socket not open");
501 HIT 16 : Option opt{};
502 16 : std::size_t sz = opt.size();
503 : std::error_code ec =
504 16 : get().get_option(Option::level(), Option::name(), opt.data(), &sz);
505 16 : if (ec)
506 MIS 0 : detail::throw_system_error(ec, "udp_socket::get_option");
507 HIT 16 : opt.resize(sz);
508 16 : return opt;
509 : }
510 :
511 : /** Get the local endpoint of the socket.
512 :
513 : @return The local endpoint, or a default endpoint if not bound.
514 : */
515 : endpoint local_endpoint() const noexcept;
516 :
517 : /** Send a datagram to the specified destination.
518 :
519 : @param buf The buffer containing data to send.
520 : @param dest The destination endpoint.
521 : @param flags Message flags (e.g. message_flags::dont_route).
522 :
523 : @return An awaitable that completes with
524 : `io_result<std::size_t>`.
525 :
526 : @throws std::logic_error if the socket is not open.
527 : */
528 : template<capy::ConstBufferSequence Buffers>
529 22 : auto send_to(
530 : Buffers const& buf,
531 : endpoint dest,
532 : corosio::message_flags flags)
533 : {
534 22 : if (!is_open())
535 MIS 0 : detail::throw_logic_error("send_to: socket not open");
536 : return send_to_awaitable(
537 HIT 22 : *this, buf, dest, static_cast<int>(flags));
538 : }
539 :
540 : /// @overload
541 : template<capy::ConstBufferSequence Buffers>
542 22 : auto send_to(Buffers const& buf, endpoint dest)
543 : {
544 22 : return send_to(buf, dest, corosio::message_flags::none);
545 : }
546 :
547 : /** Receive a datagram and capture the sender's endpoint.
548 :
549 : @param buf The buffer to receive data into.
550 : @param source Reference to an endpoint that will be set to
551 : the sender's address on successful completion.
552 : @param flags Message flags (e.g. message_flags::peek).
553 :
554 : @return An awaitable that completes with
555 : `io_result<std::size_t>`.
556 :
557 : @throws std::logic_error if the socket is not open.
558 : */
559 : template<capy::MutableBufferSequence Buffers>
560 32 : auto recv_from(
561 : Buffers const& buf,
562 : endpoint& source,
563 : corosio::message_flags flags)
564 : {
565 32 : if (!is_open())
566 MIS 0 : detail::throw_logic_error("recv_from: socket not open");
567 : return recv_from_awaitable(
568 HIT 32 : *this, buf, source, static_cast<int>(flags));
569 : }
570 :
571 : /// @overload
572 : template<capy::MutableBufferSequence Buffers>
573 32 : auto recv_from(Buffers const& buf, endpoint& source)
574 : {
575 32 : return recv_from(buf, source, corosio::message_flags::none);
576 : }
577 :
578 : /** Initiate an asynchronous connect to set the default peer.
579 :
580 : If the socket is not already open, it is opened automatically
581 : using the address family of @p ep.
582 :
583 : @param ep The remote endpoint to connect to.
584 :
585 : @return An awaitable that completes with `io_result<>`.
586 :
587 : @throws std::system_error if the socket needs to be opened
588 : and the open fails.
589 : */
590 12 : auto connect(endpoint ep)
591 : {
592 12 : if (!is_open())
593 8 : open(ep.is_v6() ? udp::v6() : udp::v4());
594 12 : return connect_awaitable(*this, ep);
595 : }
596 :
597 : /** Send a datagram to the connected peer.
598 :
599 : @param buf The buffer containing data to send.
600 : @param flags Message flags.
601 :
602 : @return An awaitable that completes with
603 : `io_result<std::size_t>`.
604 :
605 : @throws std::logic_error if the socket is not open.
606 : */
607 : template<capy::ConstBufferSequence Buffers>
608 6 : auto send(Buffers const& buf, corosio::message_flags flags)
609 : {
610 6 : if (!is_open())
611 MIS 0 : detail::throw_logic_error("send: socket not open");
612 : return send_awaitable(
613 HIT 6 : *this, buf, static_cast<int>(flags));
614 : }
615 :
616 : /// @overload
617 : template<capy::ConstBufferSequence Buffers>
618 6 : auto send(Buffers const& buf)
619 : {
620 6 : return send(buf, corosio::message_flags::none);
621 : }
622 :
623 : /** Receive a datagram from the connected peer.
624 :
625 : @param buf The buffer to receive data into.
626 : @param flags Message flags (e.g. message_flags::peek).
627 :
628 : @return An awaitable that completes with
629 : `io_result<std::size_t>`.
630 :
631 : @throws std::logic_error if the socket is not open.
632 : */
633 : template<capy::MutableBufferSequence Buffers>
634 4 : auto recv(Buffers const& buf, corosio::message_flags flags)
635 : {
636 4 : if (!is_open())
637 MIS 0 : detail::throw_logic_error("recv: socket not open");
638 : return recv_awaitable(
639 HIT 4 : *this, buf, static_cast<int>(flags));
640 : }
641 :
642 : /// @overload
643 : template<capy::MutableBufferSequence Buffers>
644 4 : auto recv(Buffers const& buf)
645 : {
646 4 : return recv(buf, corosio::message_flags::none);
647 : }
648 :
649 : /** Get the remote endpoint of the socket.
650 :
651 : Returns the address and port of the connected peer.
652 :
653 : @return The remote endpoint, or a default endpoint if
654 : not connected.
655 : */
656 : endpoint remote_endpoint() const noexcept;
657 :
658 : protected:
659 : /// Construct from a pre-built handle (for native_udp_socket).
660 : explicit udp_socket(io_object::handle h) noexcept : io_object(std::move(h))
661 : {
662 : }
663 :
664 : private:
665 : /// Open the socket for the given protocol triple.
666 : void open_for_family(int family, int type, int protocol);
667 :
668 588 : inline implementation& get() const noexcept
669 : {
670 588 : return *static_cast<implementation*>(h_.get());
671 : }
672 : };
673 :
674 : } // namespace boost::corosio
675 :
676 : #endif // BOOST_COROSIO_UDP_SOCKET_HPP
|