9#include <condition_variable>
15 using Handler = std::function<void(
const uint8_t*,
size_t)>;
19 using logic_error::logic_error;
24 using logic_error::logic_error;
33 template <
typename MessageType>
36 template <
typename MessageType>
44 char const*
const name;
46 std::map<MessageType, Handler> handlers;
47 std::map<MessageType, char const*> message_labels;
51 std::string get_error_prefix()
53 return std::string(
"[") + std::string(name) + std::string(
"] ");
56 char const* get_message_name(MessageType m)
58 const auto it = message_labels.find(m);
59 if (it == message_labels.end())
67 static std::string decorate_message_name(MessageType m,
char const* s)
69 return fmt::format(
"<{}:{}>", s, m);
72 std::string get_decorated_message_name(MessageType m)
74 return decorate_message_name(m, get_message_name(m));
93 MessageType m,
char const* message_label,
Handler h)
96 auto it = handlers.find(m);
97 if (it != handlers.end())
100 get_error_prefix() +
"MessageType " + std::to_string(m) +
101 " already handled by " + get_decorated_message_name(m) +
102 ", cannot set handler for " +
103 decorate_message_name(m, message_label));
106 LOG_DEBUG_FMT(
"Setting handler for {} ({})", message_label, m);
107 handlers.insert(it, {m, h});
109 if (message_label !=
nullptr)
111 message_labels.emplace(m, message_label);
124 auto it = handlers.find(m);
125 if (it == handlers.end())
129 "Can't remove non-existent handler for this message: " +
130 get_decorated_message_name(m));
143 return handlers.find(m) != handlers.end();
153 void dispatch(MessageType m,
const uint8_t* data,
size_t size)
155 auto it = handlers.find(m);
156 if (it == handlers.end())
160 "No handler for this message: " + get_decorated_message_name(m));
166 it->second(data, size);
168 catch (
const std::exception& e)
171 "Exception while processing message {} of size {}",
172 get_decorated_message_name(m),
178 auto& counts = message_counts[m];
180 counts.bytes += size;
186 std::swap(message_counts, current);
192 auto j = nlohmann::json::object();
193 for (
const auto& it : mc)
195 j[get_message_name(it.first)] = {
196 {
"count", it.second.messages}, {
"bytes", it.second.bytes}};
205 static inline void default_idle_behaviour(
size_t)
207 std::this_thread::yield();
213 std::atomic<bool> finished;
224 template <
typename... Ts>
237 return finished.load();
242 size_t total_read = 0;
243 size_t previous_read = -1;
245 while (!finished.load() && total_read < max_messages)
252 d.dispatch(m, data, size);
260 if (read == 0 && previous_read == 0)
265 previous_read = read;
273 size_t total_read = 0;
276 const auto read =
read_n(1, r);
289 size_t total_read = 0;
290 size_t consecutive_idles = 0u;
292 while (!finished.load())
295 total_read += num_read;
299 idler(consecutive_idles++);
303 consecutive_idles = 0;
316#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG, ...) \
317 DISP.set_message_handler(MSG, #MSG, __VA_ARGS__)
Definition messaging.h:211
void set_message_handler(Ts &&... ts)
Definition messaging.h:225
bool get_finished()
Definition messaging.h:235
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:219
size_t read_n(size_t max_messages, ringbuffer::Reader &r)
Definition messaging.h:240
BufferProcessor(char const *name="")
Definition messaging.h:216
size_t read_all(ringbuffer::Reader &r)
Definition messaging.h:271
size_t run(ringbuffer::Reader &r, IdleBehaviour idler=default_idle_behaviour)
Definition messaging.h:286
void set_finished(bool v=true)
Definition messaging.h:230
Definition messaging.h:38
void set_message_handler(MessageType m, char const *message_label, Handler h)
Definition messaging.h:92
nlohmann::json convert_message_counts(const MessageCounts &mc)
Definition messaging.h:190
MessageCounts retrieve_message_counts()
Definition messaging.h:183
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
Dispatcher(char const *name)
Definition messaging.h:78
MessageCounts< MessageType > MessageCounts
Definition messaging.h:40
bool has_handler(MessageType m)
Definition messaging.h:141
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition messaging.h:23
Definition messaging.h:18
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:206
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition messaging.h:14
std::function< void(size_t num_consecutive_idles)> IdleBehaviour
Definition messaging.h:204
std::unordered_map< MessageType, Counts > MessageCounts
Definition messaging.h:34
std::function< void(const uint8_t *, size_t)> Handler
Definition messaging.h:15
uint32_t Message
Definition ring_buffer_types.h:19
Definition messaging.h:28
size_t messages
Definition messaging.h:29
size_t bytes
Definition messaging.h:30