17 explicit EventLoop(std::unique_ptr<Thread> thd) : thd_(std::move(thd)) {
18 worker_running_ =
true;
20 thd_->StartFunc([
this]() { this->worker_(); });
24 if (thd_->IsRunning()) {
32 void PushCyclicTask(
const std::function<
void()>& cb, std::chrono::nanoseconds period,
33 std::chrono::nanoseconds offset = std::chrono::nanoseconds{0}) {
34 long period_ns = period.count();
35 long offset_ns = offset.count();
38 const auto& get_next_wakeup_time = [period_ns](
struct timespec ts) {
39 ts.tv_nsec += period_ns;
40 while (ts.tv_nsec >= kNanosecondsInSecond) {
42 ts.tv_nsec -= kNanosecondsInSecond;
44 while (ts.tv_nsec < 0) {
46 ts.tv_nsec += kNanosecondsInSecond;
51 const auto& cyclic_task = [=](
struct timespec wakeup_time, const auto& self) ->
void {
52 struct timespec current_ts{};
54 clock_gettime(CLOCK_MONOTONIC, ¤t_ts);
57 if (current_ts.tv_sec * kNanosecondsInSecond + current_ts.tv_nsec <
58 wakeup_time.tv_sec * kNanosecondsInSecond + wakeup_time.tv_nsec) {
61 wakeup_time = get_next_wakeup_time(wakeup_time);
65 int rv = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &wakeup_time,
nullptr);
75 PushTask([=] { self(wakeup_time, self); });
80 clock_gettime(CLOCK_MONOTONIC, &ts);
81 uint64_t c = (ts.tv_sec * kNanosecondsInSecond) + ts.tv_nsec;
82 c = (uint64_t)(c / period_ns + 1) * period_ns + offset_ns;
83 ts.tv_sec = (long)(c / kNanosecondsInSecond);
84 ts.tv_nsec = (long)(c % kNanosecondsInSecond);
85 PushTask([=] { cyclic_task(ts, cyclic_task); });
87 const auto& cyclic_task = [=](std::chrono::steady_clock::time_point next_wakeup_time,
const auto& self) ->
void {
88 auto current_ts = std::chrono::steady_clock::now();
90 if (current_ts < next_wakeup_time) {
93 next_wakeup_time = next_wakeup_time + period;
95 std::this_thread::sleep_until(next_wakeup_time);
100 PushTask([=] { self(next_wakeup_time, self); });
104 auto ts = std::chrono::steady_clock::now();
105 c = std::chrono::duration_cast<std::chrono::nanoseconds>(ts.time_since_epoch()).count();
106 c = (uint64_t)(c / period_ns + 1) * period_ns + offset_ns;
108 std::chrono::time_point<std::chrono::steady_clock, std::chrono::nanoseconds> ts((std::chrono::nanoseconds(c)));
109 PushTask([=] { cyclic_task(ts, cyclic_task); });
113 template <
typename F,
typename... A>
114 void PushLoopTask(F&& task, A&&... args) {
115 const auto& loop = [task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...),
116 this](
const auto& self) ->
void {
117 std::invoke(task_function);
118 this->PushTask(self, self);
120 PushTask(loop, loop);
125 template <
typename F,
typename... A>
126 void PushTask(F&& task, A&&... args) {
128 const std::scoped_lock tasks_lock(tasks_mutex_);
129 tasks_.push(std::bind(std::forward<F>(task), std::forward<A>(args)...));
131 task_available_cv_.notify_one();
134 template <
typename F,
typename... A,
typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
135 [[nodiscard]] std::future<R> Submit(F&& task, A&&... args) {
136 std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
137 PushTask([task_function = std::bind(std::forward<F>(task), std::forward<A>(args)...), task_promise] {
139 if constexpr (std::is_void_v<R>) {
140 std::invoke(task_function);
141 task_promise->set_value();
143 task_promise->set_value(std::invoke(task_function));
147 task_promise->set_exception(std::current_exception());
151 return task_promise->get_future();
154 template <
typename F,
typename... A,
typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>>
155 [[nodiscard]] R DoTask(F&& task, A&&... args) {
156 return Submit(std::forward<F>(task), std::forward<A>(args)...).get();
161 const std::scoped_lock tasks_lock(tasks_mutex_);
162 worker_running_ =
false;
164 task_available_cv_.notify_all();
169 const std::scoped_lock tasks_lock(tasks_mutex_);
174 const std::scoped_lock tasks_lock(tasks_mutex_);
179 const std::scoped_lock tasks_lock(tasks_mutex_);
180 while (!tasks_.empty()) {
185 void WaitForTasks()
noexcept {
186 std::unique_lock tasks_lock(tasks_mutex_);
188 tasks_done_cv_.wait(tasks_lock,
189 [
this] {
return !worker_running_ || (!tasks_running_ && (paused_ || tasks_.empty())); });
195 std::function<void()> task;
197 std::unique_lock tasks_lock(tasks_mutex_);
198 task_available_cv_.wait(tasks_lock, [
this] {
return !tasks_.empty() || !worker_running_; });
199 if (!worker_running_) {
205 task = std::move(tasks_.front());
212 if (waiting_ && !tasks_running_ && (paused_ || tasks_.empty())) {
213 tasks_done_cv_.notify_all();
219 std::unique_ptr<Thread> thd_;
221 bool paused_ =
false;
223 std::condition_variable task_available_cv_ = {};
225 std::condition_variable tasks_done_cv_ = {};
227 std::queue<std::function<void()>> tasks_ = {};
229 size_t tasks_running_ = 0;
231 mutable std::mutex tasks_mutex_ = {};
233 bool waiting_ =
false;
235 bool worker_running_ =
false;