17 void (*
cb)(std::unique_ptr<ThreadMsg>);
18 std::atomic<ThreadMsg*>
next =
nullptr;
20 ThreadMsg(
void (*_cb)(std::unique_ptr<ThreadMsg>)) :
cb(_cb) {}
25 template <
typename Payload>
30 template <
typename... Args>
33 data(
std::forward<Args>(args)...)
38 cb =
reinterpret_cast<void (*)(std::unique_ptr<ThreadMsg>)
>(_cb);
48 std::atomic<ThreadMsg*> item_head =
nullptr;
56 if (local_msg ==
nullptr && item_head !=
nullptr)
58 local_msg = item_head.exchange(
nullptr);
59 reverse_local_messages();
62 if (local_msg ==
nullptr)
68 local_msg = local_msg->
next;
70 current->
cb(std::unique_ptr<ThreadMsg>(current));
79 tmp_head = item_head.load();
80 item->
next = tmp_head;
81 }
while (!item_head.compare_exchange_strong(tmp_head, item));
87 TimerEntry(std::chrono::milliseconds time_offset_, uint64_t counter_) :
110 std::unique_ptr<ThreadMsg> item, std::chrono::milliseconds ms)
112 TimerEntry entry = {time_offset + ms, time_entry_counter++};
113 if (timer_map.empty() || entry.
time_offset <= next_time_offset)
118 timer_map.emplace(entry, std::move(item));
124 auto num_erased = timer_map.erase(timer_entry);
125 CCF_ASSERT(num_erased <= 1,
"Too many items erased");
126 if (!timer_map.empty() && timer_entry.
time_offset <= next_time_offset)
128 next_time_offset = timer_map.begin()->first.time_offset;
130 return num_erased != 0;
133 void tick(std::chrono::milliseconds elapsed)
135 time_offset += elapsed;
137 bool updated =
false;
139 while (!timer_map.empty() && next_time_offset <= time_offset &&
140 timer_map.begin()->first.time_offset <= time_offset)
143 auto it = timer_map.begin();
145 auto& cb = it->second->cb;
146 auto msg = std::move(it->second);
151 if (updated && !timer_map.empty())
153 next_time_offset = timer_map.begin()->first.time_offset;
163 std::chrono::milliseconds time_offset = std::chrono::milliseconds(0);
164 uint64_t time_entry_counter = 0;
165 std::map<TimerEntry, std::unique_ptr<ThreadMsg>, TimerEntryCompare>
167 std::chrono::milliseconds next_time_offset;
169 void reverse_local_messages()
171 if (local_msg ==
nullptr)
174 ThreadMsg *prev =
nullptr, *current =
nullptr, *next =
nullptr;
176 while (current !=
nullptr)
178 next = current->
next;
179 current->next = prev;
191 if (local_msg ==
nullptr && item_head !=
nullptr)
193 local_msg = item_head.exchange(
nullptr);
194 reverse_local_messages();
197 if (local_msg ==
nullptr)
202 ThreadMsg* current = local_msg;
203 local_msg = local_msg->
next;
208 friend ThreadMessaging;
213 std::atomic<bool> finished;
214 std::vector<TaskQueue> tasks;
221 for (
auto& t : tasks)
227 inline TaskQueue& get_tasks(uint16_t task_id)
229 if (task_id >= tasks.size())
231 throw std::runtime_error(fmt::format(
232 "Attempting to access task_id >= task_count, task_id:{}, "
237 return tasks[task_id];
240 static std::unique_ptr<ThreadMessaging>& get_singleton()
242 static std::unique_ptr<ThreadMessaging> singleton =
nullptr;
251 tasks(num_task_queues)
255 throw std::logic_error(fmt::format(
256 "ThreadMessaging constructed with too many tasks: {} > {}",
267 static void init(uint16_t num_task_queues)
269 auto& singleton = get_singleton();
270 if (singleton !=
nullptr)
272 throw std::logic_error(
"Called init() multiple times");
275 singleton = std::make_unique<ThreadMessaging>(num_task_queues);
280 get_singleton().reset();
285 auto& singleton = get_singleton();
286 if (singleton ==
nullptr)
288 throw std::logic_error(
289 "Attempted to access global ThreadMessaging instance without first "
305 while (!is_finished())
317 template <
typename Payload>
325 template <
typename Payload>
327 std::unique_ptr<
Tmsg<Payload>> msg, std::chrono::milliseconds ms)
358 msg->data.task.tick(msg->data.elapsed);
361 void tick(std::chrono::milliseconds elapsed)
363 for (
auto i = 0ul; i < tasks.size(); ++i)
365 auto& task = get_tasks(i);
366 auto msg = std::make_unique<Tmsg<TickMsg>>(&
tick_cb, elapsed, task);
367 task.add_task(msg.release());
373 uint16_t tid = ccf::threading::MAIN_THREAD_ID;
374 if (tasks.size() > 1)
379 tid = (i % (tasks.size() - 1));
394 return finished.load();
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition thread_messaging.h:47
void tick(std::chrono::milliseconds elapsed)
Definition thread_messaging.h:133
bool run_next_task()
Definition thread_messaging.h:54
void add_task(ThreadMsg *item)
Definition thread_messaging.h:74
std::chrono::milliseconds get_current_time_offset()
Definition thread_messaging.h:157
TimerEntry add_task_after(std::unique_ptr< ThreadMsg > item, std::chrono::milliseconds ms)
Definition thread_messaging.h:109
bool cancel_timer_task(TimerEntry timer_entry)
Definition thread_messaging.h:122
Definition thread_messaging.h:212
ThreadMessaging(uint16_t num_task_queues)
Definition thread_messaging.h:249
void tick(std::chrono::milliseconds elapsed)
Definition thread_messaging.h:361
void run()
Definition thread_messaging.h:301
std::chrono::milliseconds get_current_time_offset()
Definition thread_messaging.h:339
static void shutdown()
Definition thread_messaging.h:278
uint16_t thread_count() const
Definition thread_messaging.h:386
static ThreadMessaging & instance()
Definition thread_messaging.h:283
bool cancel_timer_task(TaskQueue::TimerEntry timer_entry)
Definition thread_messaging.h:333
bool run_one()
Definition thread_messaging.h:311
TaskQueue::TimerEntry add_task_after(std::unique_ptr< Tmsg< Payload > > msg, std::chrono::milliseconds ms)
Definition thread_messaging.h:326
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:318
static void init(uint16_t num_task_queues)
Definition thread_messaging.h:267
void set_finished(bool v=true)
Definition thread_messaging.h:296
~ThreadMessaging()
Definition thread_messaging.h:262
static void tick_cb(std::unique_ptr< Tmsg< TickMsg > > msg)
Definition thread_messaging.h:356
uint16_t get_execution_thread(uint32_t i)
Definition thread_messaging.h:371
static constexpr uint16_t max_num_threads
Definition thread_messaging.h:247
uint16_t get_current_thread_id()
Definition thread_local.cpp:15
Definition thread_messaging.h:14
Definition thread_messaging.h:97
bool operator()(const TimerEntry &lhs, const TimerEntry &rhs) const
Definition thread_messaging.h:98
Definition thread_messaging.h:85
TimerEntry(std::chrono::milliseconds time_offset_, uint64_t counter_)
Definition thread_messaging.h:87
std::chrono::milliseconds time_offset
Definition thread_messaging.h:92
TimerEntry()
Definition thread_messaging.h:86
uint64_t counter
Definition thread_messaging.h:93
Definition thread_messaging.h:346
std::chrono::milliseconds elapsed
Definition thread_messaging.h:352
TaskQueue & task
Definition thread_messaging.h:353
TickMsg(std::chrono::milliseconds elapsed_, TaskQueue &task_)
Definition thread_messaging.h:347
Definition thread_messaging.h:16
void(* cb)(std::unique_ptr< ThreadMsg >)
Definition thread_messaging.h:17
std::atomic< ThreadMsg * > next
Definition thread_messaging.h:18
virtual ~ThreadMsg()=default
ThreadMsg(void(*_cb)(std::unique_ptr< ThreadMsg >))
Definition thread_messaging.h:20
Definition thread_messaging.h:27
Tmsg(void(*_cb)(std::unique_ptr< Tmsg< Payload > >), Args &&... args)
Definition thread_messaging.h:31
void reset_cb(void(*_cb)(std::unique_ptr< Tmsg< Payload > >))
Definition thread_messaging.h:36
Payload data
Definition thread_messaging.h:28