rby1-sdk
Loading...
Searching...
No Matches
event_loop.h
1#pragma once
2
3#include <condition_variable>
4#include <functional>
5#include <future>
6#include <queue>
7
8#include "rby1-sdk/base/thread.h"
9#include "time_util.h"
10
11namespace rb {
12
13class RBY1_SDK_API EventLoop {
14 public:
15 EventLoop() : EventLoop(std::make_unique<Thread>()) {}
16
17 explicit EventLoop(std::unique_ptr<Thread> thd) : thd_(std::move(thd)) {
18 worker_running_ = true;
19
20 thd_->StartFunc([this]() { this->worker_(); });
21 }
22
23 ~EventLoop() noexcept {
24 if (thd_->IsRunning()) {
25 Pause();
26 WaitForTasks();
27 PurgeTasks();
28 Stop();
29 }
30 }
31
32 void PushCyclicTask(const std::function<void()>& cb, std::chrono::nanoseconds period,
33 std::chrono::nanoseconds offset = std::chrono::nanoseconds{0}) {
34 long period_ns = period.count();
35 long offset_ns = offset.count();
36
37#ifdef __linux__
38 const auto& get_next_wakeup_time = [period_ns](struct timespec ts) {
39 ts.tv_nsec += period_ns;
40 while (ts.tv_nsec >= kNanosecondsInSecond) {
41 ++ts.tv_sec;
42 ts.tv_nsec -= kNanosecondsInSecond;
43 }
44 while (ts.tv_nsec < 0) {
45 --ts.tv_sec;
46 ts.tv_nsec += kNanosecondsInSecond;
47 }
48 return ts;
49 };
50
51 const auto& cyclic_task = [=](struct timespec wakeup_time, const auto& self) -> void {
52 struct timespec current_ts{};
53
54 clock_gettime(CLOCK_MONOTONIC, &current_ts);
55
56 while (true) {
57 if (current_ts.tv_sec * kNanosecondsInSecond + current_ts.tv_nsec <
58 wakeup_time.tv_sec * kNanosecondsInSecond + wakeup_time.tv_nsec) {
59 break;
60 }
61 wakeup_time = get_next_wakeup_time(wakeup_time);
62 }
63
64 while (true) {
65 int rv = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &wakeup_time, nullptr);
66 if (rv == EINTR) {
67 continue;
68 }
69 break;
70 }
71 if (cb) {
72 cb();
73 }
74
75 PushTask([=] { self(wakeup_time, self); });
76 };
77
78 struct timespec ts{};
79
80 clock_gettime(CLOCK_MONOTONIC, &ts);
81 uint64_t c = (ts.tv_sec * kNanosecondsInSecond) + ts.tv_nsec;
82 c = (uint64_t)(c / period_ns + 1) * period_ns + offset_ns;
83 ts.tv_sec = (long)(c / kNanosecondsInSecond);
84 ts.tv_nsec = (long)(c % kNanosecondsInSecond);
85 PushTask([=] { cyclic_task(ts, cyclic_task); });
86#else
87 const auto& cyclic_task = [=](std::chrono::steady_clock::time_point next_wakeup_time, const auto& self) -> void {
88 auto current_ts = std::chrono::steady_clock::now();
89 while (true) {
90 if (current_ts < next_wakeup_time) {
91 break;
92 }
93 next_wakeup_time = next_wakeup_time + period;
94 }
95 std::this_thread::sleep_until(next_wakeup_time);
96 if (cb) {
97 cb();
98 }
99
100 PushTask([=] { self(next_wakeup_time, self); });
101 };
102 uint64_t c = 0;
103 {
104 auto ts = std::chrono::steady_clock::now();
105 c = std::chrono::duration_cast<std::chrono::nanoseconds>(ts.time_since_epoch()).count();
106 c = (uint64_t)(c / period_ns + 1) * period_ns + offset_ns;
107 }
108 std::chrono::time_point<std::chrono::steady_clock, std::chrono::nanoseconds> ts((std::chrono::nanoseconds(c)));
109 PushTask([=] { cyclic_task(ts, cyclic_task); });
110#endif
111 }
112
113 template <typename F, typename... A>
114 void PushLoopTask(F&& task, A&&... args) {
115 const auto& loop = [task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...),
116 this](const auto& self) -> void {
117 std::invoke(task_function);
118 this->PushTask(self, self);
119 };
120 PushTask(loop, loop);
121 }
122
123 // EventLoop(const EventLoop&) = delete;
124
125 template <typename F, typename... A>
126 void PushTask(F&& task, A&&... args) {
127 {
128 const std::scoped_lock tasks_lock(tasks_mutex_);
129 tasks_.push(std::bind(std::forward<F>(task), std::forward<A>(args)...));
130 }
131 task_available_cv_.notify_one();
132 }
133
134 template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
135 [[nodiscard]] std::future<R> Submit(F&& task, A&&... args) {
136 std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
137 PushTask([task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...), task_promise] {
138 try {
139 if constexpr (std::is_void_v<R>) {
140 std::invoke(task_function);
141 task_promise->set_value();
142 } else {
143 task_promise->set_value(std::invoke(task_function));
144 }
145 } catch (...) {
146 try {
147 task_promise->set_exception(std::current_exception());
148 } catch (...) {}
149 }
150 });
151 return task_promise->get_future();
152 }
153
154 template <typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
155 [[nodiscard]] R DoTask(F&& task, A&&... args) {
156 return Submit(std::forward<F>(task), std::forward<A>(args)...).get();
157 }
158
159 void Stop() {
160 {
161 const std::scoped_lock tasks_lock(tasks_mutex_);
162 worker_running_ = false;
163 }
164 task_available_cv_.notify_all();
165 thd_->Join();
166 }
167
168 void Pause() {
169 const std::scoped_lock tasks_lock(tasks_mutex_);
170 paused_ = true;
171 }
172
173 void Unpause() {
174 const std::scoped_lock tasks_lock(tasks_mutex_);
175 paused_ = false;
176 }
177
178 void PurgeTasks() {
179 const std::scoped_lock tasks_lock(tasks_mutex_);
180 while (!tasks_.empty()) {
181 tasks_.pop();
182 }
183 }
184
185 void WaitForTasks() noexcept {
186 std::unique_lock tasks_lock(tasks_mutex_);
187 waiting_ = true;
188 tasks_done_cv_.wait(tasks_lock,
189 [this] { return !worker_running_ || (!tasks_running_ && (paused_ || tasks_.empty())); });
190 waiting_ = false;
191 }
192
193 private:
194 void worker_() {
195 std::function<void()> task;
196 while (true) {
197 std::unique_lock tasks_lock(tasks_mutex_);
198 task_available_cv_.wait(tasks_lock, [this] { return !tasks_.empty() || !worker_running_; });
199 if (!worker_running_) {
200 break;
201 }
202 if (paused_) {
203 continue;
204 }
205 task = std::move(tasks_.front());
206 tasks_.pop();
207 ++tasks_running_;
208 tasks_lock.unlock();
209 task();
210 tasks_lock.lock();
211 --tasks_running_;
212 if (waiting_ && !tasks_running_ && (paused_ || tasks_.empty())) {
213 tasks_done_cv_.notify_all();
214 }
215 }
216 }
217
218 private:
219 std::unique_ptr<Thread> thd_;
220
221 bool paused_ = false;
222
223 std::condition_variable task_available_cv_ = {};
224
225 std::condition_variable tasks_done_cv_ = {};
226
227 std::queue<std::function<void()>> tasks_ = {};
228
229 size_t tasks_running_ = 0;
230
231 mutable std::mutex tasks_mutex_ = {};
232
233 bool waiting_ = false;
234
235 bool worker_running_ = false;
236};
237
238} // namespace rb
Definition event_loop.h:13