1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
20  
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21  

21  

22 -
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22 +
#include <boost/corosio/native/detail/epoll/epoll_traits.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
28  
#include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp>
29  

29  

30  
#include <boost/corosio/detail/except.hpp>
30  
#include <boost/corosio/detail/except.hpp>
31  

31  

32  
#include <atomic>
32  
#include <atomic>
33  
#include <chrono>
33  
#include <chrono>
34  
#include <cstdint>
34  
#include <cstdint>
35  
#include <mutex>
35  
#include <mutex>
36  
#include <vector>
36  
#include <vector>
37  

37  

38  
#include <errno.h>
38  
#include <errno.h>
39  
#include <sys/epoll.h>
39  
#include <sys/epoll.h>
40  
#include <sys/eventfd.h>
40  
#include <sys/eventfd.h>
41  
#include <sys/timerfd.h>
41  
#include <sys/timerfd.h>
42  
#include <unistd.h>
42  
#include <unistd.h>
43  

43  

44  
namespace boost::corosio::detail {
44  
namespace boost::corosio::detail {
45 -
struct epoll_op;
 
46 -
struct descriptor_state;
 
47 -

 
48  

45  

49  
/** Linux scheduler using epoll for I/O multiplexing.
46  
/** Linux scheduler using epoll for I/O multiplexing.
50  

47  

51  
    This scheduler implements the scheduler interface using Linux epoll
48  
    This scheduler implements the scheduler interface using Linux epoll
52  
    for efficient I/O event notification. It uses a single reactor model
49  
    for efficient I/O event notification. It uses a single reactor model
53  
    where one thread runs epoll_wait while other threads
50  
    where one thread runs epoll_wait while other threads
54  
    wait on a condition variable for handler work. This design provides:
51  
    wait on a condition variable for handler work. This design provides:
55  

52  

56  
    - Handler parallelism: N posted handlers can execute on N threads
53  
    - Handler parallelism: N posted handlers can execute on N threads
57  
    - No thundering herd: condition_variable wakes exactly one thread
54  
    - No thundering herd: condition_variable wakes exactly one thread
58  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
55  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
59  

56  

60  
    When threads call run(), they first try to execute queued handlers.
57  
    When threads call run(), they first try to execute queued handlers.
61  
    If the queue is empty and no reactor is running, one thread becomes
58  
    If the queue is empty and no reactor is running, one thread becomes
62  
    the reactor and runs epoll_wait. Other threads wait on a condition
59  
    the reactor and runs epoll_wait. Other threads wait on a condition
63  
    variable until handlers are available.
60  
    variable until handlers are available.
64  

61  

65  
    @par Thread Safety
62  
    @par Thread Safety
66  
    All public member functions are thread-safe.
63  
    All public member functions are thread-safe.
67  
*/
64  
*/
68  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
65  
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler
69  
{
66  
{
70  
public:
67  
public:
71  
    /** Construct the scheduler.
68  
    /** Construct the scheduler.
72  

69  

73  
        Creates an epoll instance, eventfd for reactor interruption,
70  
        Creates an epoll instance, eventfd for reactor interruption,
74  
        and timerfd for kernel-managed timer expiry.
71  
        and timerfd for kernel-managed timer expiry.
75  

72  

76  
        @param ctx Reference to the owning execution_context.
73  
        @param ctx Reference to the owning execution_context.
77  
        @param concurrency_hint Hint for expected thread count (unused).
74  
        @param concurrency_hint Hint for expected thread count (unused).
78  
    */
75  
    */
79  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
76  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
80  

77  

81  
    /// Destroy the scheduler.
78  
    /// Destroy the scheduler.
82  
    ~epoll_scheduler() override;
79  
    ~epoll_scheduler() override;
83  

80  

84  
    epoll_scheduler(epoll_scheduler const&)            = delete;
81  
    epoll_scheduler(epoll_scheduler const&)            = delete;
85  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
82  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
86  

83  

87  
    /// Shut down the scheduler, draining pending operations.
84  
    /// Shut down the scheduler, draining pending operations.
88  
    void shutdown() override;
85  
    void shutdown() override;
89  

86  

90  
    /// Apply runtime configuration, resizing the event buffer.
87  
    /// Apply runtime configuration, resizing the event buffer.
91  
    void configure_reactor(
88  
    void configure_reactor(
92  
        unsigned max_events,
89  
        unsigned max_events,
93  
        unsigned budget_init,
90  
        unsigned budget_init,
94  
        unsigned budget_max,
91  
        unsigned budget_max,
95  
        unsigned unassisted) override;
92  
        unsigned unassisted) override;
96  

93  

97  
    /** Return the epoll file descriptor.
94  
    /** Return the epoll file descriptor.
98  

95  

99  
        Used by socket services to register file descriptors
96  
        Used by socket services to register file descriptors
100  
        for I/O event notification.
97  
        for I/O event notification.
101  

98  

102  
        @return The epoll file descriptor.
99  
        @return The epoll file descriptor.
103  
    */
100  
    */
104  
    int epoll_fd() const noexcept
101  
    int epoll_fd() const noexcept
105  
    {
102  
    {
106  
        return epoll_fd_;
103  
        return epoll_fd_;
107  
    }
104  
    }
108  

105  

109  
    /** Register a descriptor for persistent monitoring.
106  
    /** Register a descriptor for persistent monitoring.
110  

107  

111  
        The fd is registered once and stays registered until explicitly
108  
        The fd is registered once and stays registered until explicitly
112 -
        deregistered. Events are dispatched via descriptor_state which
109 +
        deregistered. Events are dispatched via reactor_descriptor_state which
113  
        tracks pending read/write/connect operations.
110  
        tracks pending read/write/connect operations.
114  

111  

115  
        @param fd The file descriptor to register.
112  
        @param fd The file descriptor to register.
116  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
113  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
117  
    */
114  
    */
118 -
    void register_descriptor(int fd, descriptor_state* desc) const;
115 +
    void register_descriptor(int fd, reactor_descriptor_state* desc) const;
119  

116  

120  
    /** Deregister a persistently registered descriptor.
117  
    /** Deregister a persistently registered descriptor.
121  

118  

122  
        @param fd The file descriptor to deregister.
119  
        @param fd The file descriptor to deregister.
123  
    */
120  
    */
124  
    void deregister_descriptor(int fd) const;
121  
    void deregister_descriptor(int fd) const;
125  

122  

126  
private:
123  
private:
127  
    void
124  
    void
128  
    run_task(lock_type& lock, context_type* ctx,
125  
    run_task(lock_type& lock, context_type* ctx,
129  
        long timeout_us) override;
126  
        long timeout_us) override;
130  
    void interrupt_reactor() const override;
127  
    void interrupt_reactor() const override;
131  
    void update_timerfd() const;
128  
    void update_timerfd() const;
132  

129  

133  
    int epoll_fd_;
130  
    int epoll_fd_;
134  
    int event_fd_;
131  
    int event_fd_;
135  
    int timer_fd_;
132  
    int timer_fd_;
136  

133  

137  
    // Edge-triggered eventfd state
134  
    // Edge-triggered eventfd state
138  
    mutable std::atomic<bool> eventfd_armed_{false};
135  
    mutable std::atomic<bool> eventfd_armed_{false};
139  

136  

140  
    // Set when the earliest timer changes; flushed before epoll_wait
137  
    // Set when the earliest timer changes; flushed before epoll_wait
141  
    mutable std::atomic<bool> timerfd_stale_{false};
138  
    mutable std::atomic<bool> timerfd_stale_{false};
142  

139  

143  
    // Event buffer sized from max_events_per_poll_ (set at construction,
140  
    // Event buffer sized from max_events_per_poll_ (set at construction,
144  
    // resized by configure_reactor via io_context_options).
141  
    // resized by configure_reactor via io_context_options).
145  
    std::vector<epoll_event> event_buffer_;
142  
    std::vector<epoll_event> event_buffer_;
146  
};
143  
};
147  

144  

148  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
145  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
149  
    : epoll_fd_(-1)
146  
    : epoll_fd_(-1)
150  
    , event_fd_(-1)
147  
    , event_fd_(-1)
151  
    , timer_fd_(-1)
148  
    , timer_fd_(-1)
152  
    , event_buffer_(max_events_per_poll_)
149  
    , event_buffer_(max_events_per_poll_)
153  
{
150  
{
154  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
151  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
155  
    if (epoll_fd_ < 0)
152  
    if (epoll_fd_ < 0)
156  
        detail::throw_system_error(make_err(errno), "epoll_create1");
153  
        detail::throw_system_error(make_err(errno), "epoll_create1");
157  

154  

158  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
155  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
159  
    if (event_fd_ < 0)
156  
    if (event_fd_ < 0)
160  
    {
157  
    {
161  
        int errn = errno;
158  
        int errn = errno;
162  
        ::close(epoll_fd_);
159  
        ::close(epoll_fd_);
163  
        detail::throw_system_error(make_err(errn), "eventfd");
160  
        detail::throw_system_error(make_err(errn), "eventfd");
164  
    }
161  
    }
165  

162  

166  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
163  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
167  
    if (timer_fd_ < 0)
164  
    if (timer_fd_ < 0)
168  
    {
165  
    {
169  
        int errn = errno;
166  
        int errn = errno;
170  
        ::close(event_fd_);
167  
        ::close(event_fd_);
171  
        ::close(epoll_fd_);
168  
        ::close(epoll_fd_);
172  
        detail::throw_system_error(make_err(errn), "timerfd_create");
169  
        detail::throw_system_error(make_err(errn), "timerfd_create");
173  
    }
170  
    }
174  

171  

175  
    epoll_event ev{};
172  
    epoll_event ev{};
176  
    ev.events   = EPOLLIN | EPOLLET;
173  
    ev.events   = EPOLLIN | EPOLLET;
177  
    ev.data.ptr = nullptr;
174  
    ev.data.ptr = nullptr;
178  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
175  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
179  
    {
176  
    {
180  
        int errn = errno;
177  
        int errn = errno;
181  
        ::close(timer_fd_);
178  
        ::close(timer_fd_);
182  
        ::close(event_fd_);
179  
        ::close(event_fd_);
183  
        ::close(epoll_fd_);
180  
        ::close(epoll_fd_);
184  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
181  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
185  
    }
182  
    }
186  

183  

187  
    epoll_event timer_ev{};
184  
    epoll_event timer_ev{};
188  
    timer_ev.events   = EPOLLIN | EPOLLERR;
185  
    timer_ev.events   = EPOLLIN | EPOLLERR;
189  
    timer_ev.data.ptr = &timer_fd_;
186  
    timer_ev.data.ptr = &timer_fd_;
190  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
187  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
191  
    {
188  
    {
192  
        int errn = errno;
189  
        int errn = errno;
193  
        ::close(timer_fd_);
190  
        ::close(timer_fd_);
194  
        ::close(event_fd_);
191  
        ::close(event_fd_);
195  
        ::close(epoll_fd_);
192  
        ::close(epoll_fd_);
196  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
193  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
197  
    }
194  
    }
198  

195  

199  
    timer_svc_ = &get_timer_service(ctx, *this);
196  
    timer_svc_ = &get_timer_service(ctx, *this);
200  
    timer_svc_->set_on_earliest_changed(
197  
    timer_svc_->set_on_earliest_changed(
201  
        timer_service::callback(this, [](void* p) {
198  
        timer_service::callback(this, [](void* p) {
202  
            auto* self = static_cast<epoll_scheduler*>(p);
199  
            auto* self = static_cast<epoll_scheduler*>(p);
203  
            self->timerfd_stale_.store(true, std::memory_order_release);
200  
            self->timerfd_stale_.store(true, std::memory_order_release);
204  
            self->interrupt_reactor();
201  
            self->interrupt_reactor();
205  
        }));
202  
        }));
206  

203  

207  
    get_resolver_service(ctx, *this);
204  
    get_resolver_service(ctx, *this);
208  
    get_signal_service(ctx, *this);
205  
    get_signal_service(ctx, *this);
209  
    get_stream_file_service(ctx, *this);
206  
    get_stream_file_service(ctx, *this);
210  
    get_random_access_file_service(ctx, *this);
207  
    get_random_access_file_service(ctx, *this);
211  

208  

212  
    completed_ops_.push(&task_op_);
209  
    completed_ops_.push(&task_op_);
213  
}
210  
}
214  

211  

215  
inline epoll_scheduler::~epoll_scheduler()
212  
inline epoll_scheduler::~epoll_scheduler()
216  
{
213  
{
217  
    if (timer_fd_ >= 0)
214  
    if (timer_fd_ >= 0)
218  
        ::close(timer_fd_);
215  
        ::close(timer_fd_);
219  
    if (event_fd_ >= 0)
216  
    if (event_fd_ >= 0)
220  
        ::close(event_fd_);
217  
        ::close(event_fd_);
221  
    if (epoll_fd_ >= 0)
218  
    if (epoll_fd_ >= 0)
222  
        ::close(epoll_fd_);
219  
        ::close(epoll_fd_);
223  
}
220  
}
224  

221  

225  
inline void
222  
inline void
226  
epoll_scheduler::shutdown()
223  
epoll_scheduler::shutdown()
227  
{
224  
{
228  
    shutdown_drain();
225  
    shutdown_drain();
229  

226  

230  
    if (event_fd_ >= 0)
227  
    if (event_fd_ >= 0)
231  
        interrupt_reactor();
228  
        interrupt_reactor();
232  
}
229  
}
233  

230  

234  
inline void
231  
inline void
235  
epoll_scheduler::configure_reactor(
232  
epoll_scheduler::configure_reactor(
236  
    unsigned max_events,
233  
    unsigned max_events,
237  
    unsigned budget_init,
234  
    unsigned budget_init,
238  
    unsigned budget_max,
235  
    unsigned budget_max,
239  
    unsigned unassisted)
236  
    unsigned unassisted)
240  
{
237  
{
241  
    reactor_scheduler::configure_reactor(
238  
    reactor_scheduler::configure_reactor(
242  
        max_events, budget_init, budget_max, unassisted);
239  
        max_events, budget_init, budget_max, unassisted);
243  
    event_buffer_.resize(max_events_per_poll_);
240  
    event_buffer_.resize(max_events_per_poll_);
244  
}
241  
}
245  

242  

246  
inline void
243  
inline void
247 -
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
244 +
epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const
248  
{
245  
{
249  
    epoll_event ev{};
246  
    epoll_event ev{};
250  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
247  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
251  
    ev.data.ptr = desc;
248  
    ev.data.ptr = desc;
252  

249  

253  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
250  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
254  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
251  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
255  

252  

256  
    desc->registered_events = ev.events;
253  
    desc->registered_events = ev.events;
257  
    desc->fd                = fd;
254  
    desc->fd                = fd;
258  
    desc->scheduler_        = this;
255  
    desc->scheduler_        = this;
259  
    desc->mutex.set_enabled(!single_threaded_);
256  
    desc->mutex.set_enabled(!single_threaded_);
260  
    desc->ready_events_.store(0, std::memory_order_relaxed);
257  
    desc->ready_events_.store(0, std::memory_order_relaxed);
261  

258  

262  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
259  
    conditionally_enabled_mutex::scoped_lock lock(desc->mutex);
263  
    desc->impl_ref_.reset();
260  
    desc->impl_ref_.reset();
264  
    desc->read_ready  = false;
261  
    desc->read_ready  = false;
265  
    desc->write_ready = false;
262  
    desc->write_ready = false;
266  
}
263  
}
267  

264  

268  
inline void
265  
inline void
269  
epoll_scheduler::deregister_descriptor(int fd) const
266  
epoll_scheduler::deregister_descriptor(int fd) const
270  
{
267  
{
271  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
268  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
272  
}
269  
}
273  

270  

274  
inline void
271  
inline void
275  
epoll_scheduler::interrupt_reactor() const
272  
epoll_scheduler::interrupt_reactor() const
276  
{
273  
{
277  
    bool expected = false;
274  
    bool expected = false;
278  
    if (eventfd_armed_.compare_exchange_strong(
275  
    if (eventfd_armed_.compare_exchange_strong(
279  
            expected, true, std::memory_order_release,
276  
            expected, true, std::memory_order_release,
280  
            std::memory_order_relaxed))
277  
            std::memory_order_relaxed))
281  
    {
278  
    {
282  
        std::uint64_t val       = 1;
279  
        std::uint64_t val       = 1;
283  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
280  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
284  
    }
281  
    }
285  
}
282  
}
286  

283  

287  
inline void
284  
inline void
288  
epoll_scheduler::update_timerfd() const
285  
epoll_scheduler::update_timerfd() const
289  
{
286  
{
290  
    auto nearest = timer_svc_->nearest_expiry();
287  
    auto nearest = timer_svc_->nearest_expiry();
291  

288  

292  
    itimerspec ts{};
289  
    itimerspec ts{};
293  
    int flags = 0;
290  
    int flags = 0;
294  

291  

295  
    if (nearest == timer_service::time_point::max())
292  
    if (nearest == timer_service::time_point::max())
296  
    {
293  
    {
297  
        // No timers — disarm by setting to 0 (relative)
294  
        // No timers — disarm by setting to 0 (relative)
298  
    }
295  
    }
299  
    else
296  
    else
300  
    {
297  
    {
301  
        auto now = std::chrono::steady_clock::now();
298  
        auto now = std::chrono::steady_clock::now();
302  
        if (nearest <= now)
299  
        if (nearest <= now)
303  
        {
300  
        {
304  
            // Use 1ns instead of 0 — zero disarms the timerfd
301  
            // Use 1ns instead of 0 — zero disarms the timerfd
305  
            ts.it_value.tv_nsec = 1;
302  
            ts.it_value.tv_nsec = 1;
306  
        }
303  
        }
307  
        else
304  
        else
308  
        {
305  
        {
309  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
306  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
310  
                            nearest - now)
307  
                            nearest - now)
311  
                            .count();
308  
                            .count();
312  
            ts.it_value.tv_sec  = nsec / 1000000000;
309  
            ts.it_value.tv_sec  = nsec / 1000000000;
313  
            ts.it_value.tv_nsec = nsec % 1000000000;
310  
            ts.it_value.tv_nsec = nsec % 1000000000;
314  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
311  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
315  
                ts.it_value.tv_nsec = 1;
312  
                ts.it_value.tv_nsec = 1;
316  
        }
313  
        }
317  
    }
314  
    }
318  

315  

319  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
316  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
320  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
317  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
321  
}
318  
}
322  

319  

323  
inline void
320  
inline void
324  
epoll_scheduler::run_task(
321  
epoll_scheduler::run_task(
325  
    lock_type& lock, context_type* ctx, long timeout_us)
322  
    lock_type& lock, context_type* ctx, long timeout_us)
326  
{
323  
{
327  
    int timeout_ms;
324  
    int timeout_ms;
328  
    if (task_interrupted_)
325  
    if (task_interrupted_)
329  
        timeout_ms = 0;
326  
        timeout_ms = 0;
330  
    else if (timeout_us < 0)
327  
    else if (timeout_us < 0)
331  
        timeout_ms = -1;
328  
        timeout_ms = -1;
332  
    else
329  
    else
333  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
330  
        timeout_ms = static_cast<int>((timeout_us + 999) / 1000);
334  

331  

335  
    if (lock.owns_lock())
332  
    if (lock.owns_lock())
336  
        lock.unlock();
333  
        lock.unlock();
337  

334  

338  
    task_cleanup on_exit{this, &lock, ctx};
335  
    task_cleanup on_exit{this, &lock, ctx};
339  

336  

340  
    // Flush deferred timerfd programming before blocking
337  
    // Flush deferred timerfd programming before blocking
341  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
338  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
342  
        update_timerfd();
339  
        update_timerfd();
343  

340  

344  
    int nfds = ::epoll_wait(
341  
    int nfds = ::epoll_wait(
345  
        epoll_fd_, event_buffer_.data(),
342  
        epoll_fd_, event_buffer_.data(),
346  
        static_cast<int>(event_buffer_.size()), timeout_ms);
343  
        static_cast<int>(event_buffer_.size()), timeout_ms);
347  

344  

348  
    if (nfds < 0 && errno != EINTR)
345  
    if (nfds < 0 && errno != EINTR)
349  
        detail::throw_system_error(make_err(errno), "epoll_wait");
346  
        detail::throw_system_error(make_err(errno), "epoll_wait");
350  

347  

351  
    bool check_timers = false;
348  
    bool check_timers = false;
352  
    op_queue local_ops;
349  
    op_queue local_ops;
353  

350  

354  
    for (int i = 0; i < nfds; ++i)
351  
    for (int i = 0; i < nfds; ++i)
355  
    {
352  
    {
356  
        if (event_buffer_[i].data.ptr == nullptr)
353  
        if (event_buffer_[i].data.ptr == nullptr)
357  
        {
354  
        {
358  
            std::uint64_t val;
355  
            std::uint64_t val;
359  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
356  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
360  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
357  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
361  
            eventfd_armed_.store(false, std::memory_order_relaxed);
358  
            eventfd_armed_.store(false, std::memory_order_relaxed);
362  
            continue;
359  
            continue;
363  
        }
360  
        }
364  

361  

365  
        if (event_buffer_[i].data.ptr == &timer_fd_)
362  
        if (event_buffer_[i].data.ptr == &timer_fd_)
366  
        {
363  
        {
367  
            std::uint64_t expirations;
364  
            std::uint64_t expirations;
368  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
365  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
369  
            [[maybe_unused]] auto r =
366  
            [[maybe_unused]] auto r =
370  
                ::read(timer_fd_, &expirations, sizeof(expirations));
367  
                ::read(timer_fd_, &expirations, sizeof(expirations));
371  
            check_timers = true;
368  
            check_timers = true;
372  
            continue;
369  
            continue;
373  
        }
370  
        }
374  

371  

375  
        auto* desc =
372  
        auto* desc =
376 -
            static_cast<descriptor_state*>(event_buffer_[i].data.ptr);
373 +
            static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr);
377  
        desc->add_ready_events(event_buffer_[i].events);
374  
        desc->add_ready_events(event_buffer_[i].events);
378  

375  

379  
        bool expected = false;
376  
        bool expected = false;
380  
        if (desc->is_enqueued_.compare_exchange_strong(
377  
        if (desc->is_enqueued_.compare_exchange_strong(
381  
                expected, true, std::memory_order_release,
378  
                expected, true, std::memory_order_release,
382  
                std::memory_order_relaxed))
379  
                std::memory_order_relaxed))
383  
        {
380  
        {
384  
            local_ops.push(desc);
381  
            local_ops.push(desc);
385  
        }
382  
        }
386  
    }
383  
    }
387  

384  

388  
    if (check_timers)
385  
    if (check_timers)
389  
    {
386  
    {
390  
        timer_svc_->process_expired();
387  
        timer_svc_->process_expired();
391  
        update_timerfd();
388  
        update_timerfd();
392  
    }
389  
    }
393  

390  

394  
    lock.lock();
391  
    lock.lock();
395  

392  

396  
    if (!local_ops.empty())
393  
    if (!local_ops.empty())
397  
        completed_ops_.splice(local_ops);
394  
        completed_ops_.splice(local_ops);
398  
}
395  
}
399  

396  

400  
} // namespace boost::corosio::detail
397  
} // namespace boost::corosio::detail
401  

398  

402  
#endif // BOOST_COROSIO_HAS_EPOLL
399  
#endif // BOOST_COROSIO_HAS_EPOLL
403  

400  

404  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
401  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP