TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/select/select_traits.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28 : #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29 :
30 : #include <boost/corosio/detail/except.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <unistd.h>
34 : #include <errno.h>
35 : #include <fcntl.h>
36 :
37 : #include <atomic>
38 : #include <chrono>
39 : #include <cstdint>
40 : #include <limits>
41 : #include <mutex>
42 : #include <unordered_map>
43 :
44 : namespace boost::corosio::detail {
45 :
46 : struct select_op;
47 :
48 : /** POSIX scheduler using select() for I/O multiplexing.
49 :
50 : This scheduler implements the scheduler interface using the POSIX select()
51 : call for I/O event notification. It inherits the shared reactor threading
52 : model from reactor_scheduler: signal state machine, inline completion
53 : budget, work counting, and the do_one event loop.
54 :
55 : The design mirrors epoll_scheduler for behavioral consistency:
56 : - Same single-reactor thread coordination model
57 : - Same deferred I/O pattern (reactor marks ready; workers do I/O)
58 : - Same timer integration pattern
59 :
60 : Known Limitations:
61 : - FD_SETSIZE (~1024) limits maximum concurrent connections
62 : - O(n) scanning: rebuilds fd_sets each iteration
63 : - Level-triggered only (no edge-triggered mode)
64 :
65 : @par Thread Safety
66 : All public member functions are thread-safe.
67 : */
68 : class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler
69 : {
70 : public:
71 : /** Construct the scheduler.
72 :
73 : Creates a self-pipe for reactor interruption.
74 :
75 : @param ctx Reference to the owning execution_context.
76 : @param concurrency_hint Hint for expected thread count (unused).
77 : */
78 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
79 :
80 : /// Destroy the scheduler.
81 : ~select_scheduler() override;
82 :
83 : select_scheduler(select_scheduler const&) = delete;
84 : select_scheduler& operator=(select_scheduler const&) = delete;
85 :
86 : /// Shut down the scheduler, draining pending operations.
87 : void shutdown() override;
88 :
89 : /** Return the maximum file descriptor value supported.
90 :
91 : Returns FD_SETSIZE - 1, the maximum fd value that can be
92 : monitored by select(). Operations with fd >= FD_SETSIZE
93 : will fail with EINVAL.
94 :
95 : @return The maximum supported file descriptor value.
96 : */
97 : static constexpr int max_fd() noexcept
98 : {
99 : return FD_SETSIZE - 1;
100 : }
101 :
102 : /** Register a descriptor for persistent monitoring.
103 :
104 : The fd is added to the registered_descs_ map and will be
105 : included in subsequent select() calls. The reactor is
106 : interrupted so a blocked select() rebuilds its fd_sets.
107 :
108 : @param fd The file descriptor to register.
109 : @param desc Pointer to descriptor state for this fd.
110 : */
111 : void register_descriptor(int fd, reactor_descriptor_state* desc) const;
112 :
113 : /** Deregister a persistently registered descriptor.
114 :
115 : @param fd The file descriptor to deregister.
116 : */
117 : void deregister_descriptor(int fd) const;
118 :
119 : /** Interrupt the reactor so it rebuilds its fd_sets.
120 :
121 : Called when a write or connect op is registered after
122 : the reactor's snapshot was taken. Without this, select()
123 : may block not watching for writability on the fd.
124 : */
125 : void notify_reactor() const;
126 :
127 : private:
128 : void
129 : run_task(lock_type& lock, context_type* ctx,
130 : long timeout_us) override;
131 : void interrupt_reactor() const override;
132 : long calculate_timeout(long requested_timeout_us) const;
133 :
134 : // Self-pipe for interrupting select()
135 : int pipe_fds_[2]; // [0]=read, [1]=write
136 :
137 : // Per-fd tracking for fd_set building
138 : mutable std::unordered_map<int, reactor_descriptor_state*> registered_descs_;
139 : mutable int max_fd_ = -1;
140 : };
141 :
142 HIT 229 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
143 229 : : pipe_fds_{-1, -1}
144 229 : , max_fd_(-1)
145 : {
146 229 : if (::pipe(pipe_fds_) < 0)
147 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
148 :
149 HIT 687 : for (int i = 0; i < 2; ++i)
150 : {
151 458 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
152 458 : if (flags == -1)
153 : {
154 MIS 0 : int errn = errno;
155 0 : ::close(pipe_fds_[0]);
156 0 : ::close(pipe_fds_[1]);
157 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
158 : }
159 HIT 458 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
160 : {
161 MIS 0 : int errn = errno;
162 0 : ::close(pipe_fds_[0]);
163 0 : ::close(pipe_fds_[1]);
164 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
165 : }
166 HIT 458 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
167 : {
168 MIS 0 : int errn = errno;
169 0 : ::close(pipe_fds_[0]);
170 0 : ::close(pipe_fds_[1]);
171 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
172 : }
173 : }
174 :
175 HIT 229 : timer_svc_ = &get_timer_service(ctx, *this);
176 229 : timer_svc_->set_on_earliest_changed(
177 3538 : timer_service::callback(this, [](void* p) {
178 3309 : static_cast<select_scheduler*>(p)->interrupt_reactor();
179 3309 : }));
180 :
181 229 : get_resolver_service(ctx, *this);
182 229 : get_signal_service(ctx, *this);
183 229 : get_stream_file_service(ctx, *this);
184 229 : get_random_access_file_service(ctx, *this);
185 :
186 229 : completed_ops_.push(&task_op_);
187 229 : }
188 :
189 458 : inline select_scheduler::~select_scheduler()
190 : {
191 229 : if (pipe_fds_[0] >= 0)
192 229 : ::close(pipe_fds_[0]);
193 229 : if (pipe_fds_[1] >= 0)
194 229 : ::close(pipe_fds_[1]);
195 458 : }
196 :
197 : inline void
198 229 : select_scheduler::shutdown()
199 : {
200 229 : shutdown_drain();
201 :
202 229 : if (pipe_fds_[1] >= 0)
203 229 : interrupt_reactor();
204 229 : }
205 :
206 : inline void
207 6318 : select_scheduler::register_descriptor(
208 : int fd, reactor_descriptor_state* desc) const
209 : {
210 6318 : if (fd < 0 || fd >= FD_SETSIZE)
211 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
212 :
213 HIT 6318 : desc->registered_events = reactor_event_read | reactor_event_write;
214 6318 : desc->fd = fd;
215 6318 : desc->scheduler_ = this;
216 6318 : desc->mutex.set_enabled(!single_threaded_);
217 6318 : desc->ready_events_.store(0, std::memory_order_relaxed);
218 :
219 : {
220 6318 : conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
221 6318 : desc->impl_ref_.reset();
222 6318 : desc->read_ready = false;
223 6318 : desc->write_ready = false;
224 6318 : }
225 :
226 : {
227 6318 : mutex_type::scoped_lock lock(mutex_);
228 6318 : registered_descs_[fd] = desc;
229 6318 : if (fd > max_fd_)
230 6314 : max_fd_ = fd;
231 6318 : }
232 :
233 6318 : interrupt_reactor();
234 6318 : }
235 :
236 : inline void
237 6318 : select_scheduler::deregister_descriptor(int fd) const
238 : {
239 6318 : mutex_type::scoped_lock lock(mutex_);
240 :
241 6318 : auto it = registered_descs_.find(fd);
242 6318 : if (it == registered_descs_.end())
243 MIS 0 : return;
244 :
245 HIT 6318 : registered_descs_.erase(it);
246 :
247 6318 : if (fd == max_fd_)
248 : {
249 6256 : max_fd_ = pipe_fds_[0];
250 12403 : for (auto& [registered_fd, state] : registered_descs_)
251 : {
252 6147 : if (registered_fd > max_fd_)
253 6137 : max_fd_ = registered_fd;
254 : }
255 : }
256 6318 : }
257 :
258 : inline void
259 3085 : select_scheduler::notify_reactor() const
260 : {
261 3085 : interrupt_reactor();
262 3085 : }
263 :
264 : inline void
265 13096 : select_scheduler::interrupt_reactor() const
266 : {
267 13096 : char byte = 1;
268 13096 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
269 13096 : }
270 :
271 : inline long
272 126026 : select_scheduler::calculate_timeout(long requested_timeout_us) const
273 : {
274 126026 : if (requested_timeout_us == 0)
275 MIS 0 : return 0;
276 :
277 HIT 126026 : auto nearest = timer_svc_->nearest_expiry();
278 126026 : if (nearest == timer_service::time_point::max())
279 46 : return requested_timeout_us;
280 :
281 125980 : auto now = std::chrono::steady_clock::now();
282 125980 : if (nearest <= now)
283 659 : return 0;
284 :
285 : auto timer_timeout_us =
286 125321 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
287 125321 : .count();
288 :
289 125321 : constexpr auto long_max =
290 : static_cast<long long>((std::numeric_limits<long>::max)());
291 : auto capped_timer_us =
292 125321 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
293 125321 : static_cast<long long>(0)),
294 125321 : long_max);
295 :
296 125321 : if (requested_timeout_us < 0)
297 125315 : return static_cast<long>(capped_timer_us);
298 :
299 : return static_cast<long>(
300 6 : (std::min)(static_cast<long long>(requested_timeout_us),
301 6 : capped_timer_us));
302 : }
303 :
304 : inline void
305 149796 : select_scheduler::run_task(
306 : lock_type& lock, context_type* ctx, long timeout_us)
307 : {
308 : long effective_timeout_us =
309 149796 : task_interrupted_ ? 0 : calculate_timeout(timeout_us);
310 :
311 : // Snapshot registered descriptors while holding lock.
312 : // Record which fds need write monitoring to avoid a hot loop:
313 : // select is level-triggered so writable sockets (nearly always
314 : // writable) would cause select() to return immediately every
315 : // iteration if unconditionally added to write_fds.
316 : struct fd_entry
317 : {
318 : int fd;
319 : reactor_descriptor_state* desc;
320 : bool needs_write;
321 : };
322 : fd_entry snapshot[FD_SETSIZE];
323 149796 : int snapshot_count = 0;
324 :
325 444745 : for (auto& [fd, desc] : registered_descs_)
326 : {
327 294949 : if (snapshot_count < FD_SETSIZE)
328 : {
329 294949 : conditionally_enabled_mutex::scoped_lock desc_lock(desc->mutex);
330 294949 : snapshot[snapshot_count].fd = fd;
331 294949 : snapshot[snapshot_count].desc = desc;
332 294949 : snapshot[snapshot_count].needs_write =
333 294949 : (desc->write_op || desc->connect_op);
334 294949 : ++snapshot_count;
335 294949 : }
336 : }
337 :
338 149796 : if (lock.owns_lock())
339 126026 : lock.unlock();
340 :
341 149796 : task_cleanup on_exit{this, &lock, ctx};
342 :
343 : fd_set read_fds, write_fds, except_fds;
344 2546532 : FD_ZERO(&read_fds);
345 2546532 : FD_ZERO(&write_fds);
346 2546532 : FD_ZERO(&except_fds);
347 :
348 149796 : FD_SET(pipe_fds_[0], &read_fds);
349 149796 : int nfds = pipe_fds_[0];
350 :
351 444745 : for (int i = 0; i < snapshot_count; ++i)
352 : {
353 294949 : int fd = snapshot[i].fd;
354 294949 : FD_SET(fd, &read_fds);
355 294949 : if (snapshot[i].needs_write)
356 3085 : FD_SET(fd, &write_fds);
357 294949 : FD_SET(fd, &except_fds);
358 294949 : if (fd > nfds)
359 149530 : nfds = fd;
360 : }
361 :
362 : struct timeval tv;
363 149796 : struct timeval* tv_ptr = nullptr;
364 149796 : if (effective_timeout_us >= 0)
365 : {
366 149750 : tv.tv_sec = effective_timeout_us / 1000000;
367 149750 : tv.tv_usec = effective_timeout_us % 1000000;
368 149750 : tv_ptr = &tv;
369 : }
370 :
371 149796 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
372 :
373 : // EINTR: signal interrupted select(), just retry.
374 : // EBADF: an fd was closed between snapshot and select(); retry
375 : // with a fresh snapshot from registered_descs_.
376 149796 : if (ready < 0)
377 : {
378 MIS 0 : if (errno == EINTR || errno == EBADF)
379 0 : return;
380 0 : detail::throw_system_error(make_err(errno), "select");
381 : }
382 :
383 : // Process timers outside the lock
384 HIT 149796 : timer_svc_->process_expired();
385 :
386 149796 : op_queue local_ops;
387 :
388 149796 : if (ready > 0)
389 : {
390 132151 : if (FD_ISSET(pipe_fds_[0], &read_fds))
391 : {
392 : char buf[256];
393 12966 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
394 : {
395 : }
396 : }
397 :
398 379297 : for (int i = 0; i < snapshot_count; ++i)
399 : {
400 247146 : int fd = snapshot[i].fd;
401 247146 : reactor_descriptor_state* desc = snapshot[i].desc;
402 :
403 247146 : std::uint32_t flags = 0;
404 247146 : if (FD_ISSET(fd, &read_fds))
405 128933 : flags |= reactor_event_read;
406 247146 : if (FD_ISSET(fd, &write_fds))
407 3085 : flags |= reactor_event_write;
408 247146 : if (FD_ISSET(fd, &except_fds))
409 MIS 0 : flags |= reactor_event_error;
410 :
411 HIT 247146 : if (flags == 0)
412 115130 : continue;
413 :
414 132016 : desc->add_ready_events(flags);
415 :
416 132016 : bool expected = false;
417 132016 : if (desc->is_enqueued_.compare_exchange_strong(
418 : expected, true, std::memory_order_release,
419 : std::memory_order_relaxed))
420 : {
421 132016 : local_ops.push(desc);
422 : }
423 : }
424 : }
425 :
426 149796 : lock.lock();
427 :
428 149796 : if (!local_ops.empty())
429 128934 : completed_ops_.splice(local_ops);
430 149796 : }
431 :
432 : } // namespace boost::corosio::detail
433 :
434 : #endif // BOOST_COROSIO_HAS_SELECT
435 :
436 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|