CCF
Loading...
Searching...
No Matches
messaging.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ring_buffer.h"
7
8#include <atomic>
9#include <condition_variable>
10#include <map>
11#include <stdexcept>
12
13namespace messaging
14{
15 using Handler = std::function<void(const uint8_t*, size_t)>;
16
17 class no_handler : public std::logic_error
18 {
19 using logic_error::logic_error;
20 };
21
22 class already_handled : public std::logic_error
23 {
24 using logic_error::logic_error;
25 };
26
27 struct Counts
28 {
29 size_t messages;
30 size_t bytes;
31 };
32
33 template <typename MessageType>
34 using MessageCounts = std::unordered_map<MessageType, Counts>;
35
36 template <typename MessageType>
38 {
39 public:
41
42 private:
43 // Store a name to distinguish error messages
44 char const* const name;
45
46 std::map<MessageType, Handler> handlers;
47 std::map<MessageType, char const*> message_labels;
48
49 std::string get_error_prefix()
50 {
51 return std::string("[") + std::string(name) + std::string("] ");
52 }
53
54 char const* get_message_name(MessageType m)
55 {
56 const auto it = message_labels.find(m);
57 if (it == message_labels.end())
58 {
59 return "unknown";
60 }
61
62 return it->second;
63 }
64
65 static std::string decorate_message_name(MessageType m, char const* s)
66 {
67 return fmt::format("<{}:{}>", s, m);
68 }
69
70 std::string get_decorated_message_name(MessageType m)
71 {
72 return decorate_message_name(m, get_message_name(m));
73 }
74
75 public:
76 Dispatcher(char const* name) : name(name), handlers() {}
77
91 MessageType m, char const* message_label, Handler h)
92 {
93 // Check for presence first, so we only copy if we're actually inserting
94 auto it = handlers.find(m);
95 if (it != handlers.end())
96 {
97 throw already_handled(
98 get_error_prefix() + "MessageType " + std::to_string(m) +
99 " already handled by " + get_decorated_message_name(m) +
100 ", cannot set handler for " +
101 decorate_message_name(m, message_label));
102 }
103
104 LOG_DEBUG_FMT("Setting handler for {} ({})", message_label, m);
105 handlers.insert(it, {m, h});
106
107 if (message_label != nullptr)
108 {
109 message_labels.emplace(m, message_label);
110 }
111 }
112
120 void remove_message_handler(MessageType m)
121 {
122 auto it = handlers.find(m);
123 if (it == handlers.end())
124 {
125 throw no_handler(
126 get_error_prefix() +
127 "Can't remove non-existent handler for this message: " +
128 get_decorated_message_name(m));
129 }
130
131 handlers.erase(it);
132 }
133
139 bool has_handler(MessageType m)
140 {
141 return handlers.find(m) != handlers.end();
142 }
143
151 void dispatch(MessageType m, const uint8_t* data, size_t size)
152 {
153 auto it = handlers.find(m);
154 if (it == handlers.end())
155 {
156 throw no_handler(
157 get_error_prefix() +
158 "No handler for this message: " + get_decorated_message_name(m));
159 }
160
161 // Handlers may register or remove handlers, so iterator is invalidated
162 try
163 {
164 it->second(data, size);
165 }
166 catch (const std::exception& e)
167 {
169 "Exception while processing message {} of size {}",
170 get_decorated_message_name(m),
171 size);
172 LOG_TRACE_FMT("{}", e.what());
173 throw e;
174 }
175 }
176 };
177
179
180 using IdleBehaviour = std::function<void(size_t num_consecutive_idles)>;
181 static inline void default_idle_behaviour(size_t /*unused*/)
182 {
183 std::this_thread::yield();
184 }
185
187 {
188 RingbufferDispatcher dispatcher;
189 std::atomic<bool> finished;
190
191 public:
192 BufferProcessor(char const* name = "") : dispatcher(name), finished(false)
193 {}
194
196 {
197 return dispatcher;
198 }
199
200 template <typename... Ts>
201 void set_message_handler(Ts&&... ts)
202 {
203 dispatcher.set_message_handler(std::forward<Ts>(ts)...);
204 }
205
206 void set_finished(bool v = true)
207 {
208 finished.store(v);
209 }
210
212 {
213 return finished.load();
214 }
215
216 size_t read_n(size_t max_messages, ringbuffer::Reader& r)
217 {
218 size_t total_read = 0;
219 size_t previous_read = -1;
220
221 while (!finished.load() && total_read < max_messages)
222 {
223 // Read one at a time so we don't process any after being told to stop
224 auto read = r.read(
225 1,
226 [&d = dispatcher](
227 ringbuffer::Message m, const uint8_t* data, size_t size) {
228 d.dispatch(m, data, size);
229 });
230
231 total_read += read;
232
233 // Because of padding messages, a single empty read may be seen near the
234 // end of the ringbuffer. Use consecutive empty reads as a sign that
235 // we've actually read everything
236 if (read == 0 && previous_read == 0)
237 {
238 break;
239 }
240
241 previous_read = read;
242 }
243
244 return total_read;
245 }
246
248 {
249 size_t total_read = 0;
250 while (true)
251 {
252 const auto read = read_n(1, r);
253 total_read += read;
254 if (read == 0)
255 {
256 break;
257 }
258 }
259 return total_read;
260 }
261
262 size_t run(
263 ringbuffer::Reader& r, IdleBehaviour idler = default_idle_behaviour)
264 {
265 size_t total_read = 0;
266 size_t consecutive_idles = 0u;
267
268 while (!finished.load())
269 {
270 auto num_read = read_all(r);
271 total_read += num_read;
272
273 if (num_read == 0)
274 {
275 idler(consecutive_idles++);
276 }
277 else
278 {
279 consecutive_idles = 0;
280 }
281 }
282
283 return total_read;
284 }
285 };
286
287 // The last variadic argument is expected to be the handler itself. It is
288 // variadic so that you can use an inline lambda, _with commas_. The
289 // preprocessor will blindly paste this as (what it thinks are) multiple
290 // arguments to set_message_handler, and the real processor will recognise
291 // will read it as the original lambda.
292#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG, ...) \
293 DISP.set_message_handler(MSG, #MSG, __VA_ARGS__)
294}
Definition messaging.h:187
void set_message_handler(Ts &&... ts)
Definition messaging.h:201
bool get_finished()
Definition messaging.h:211
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:195
size_t read_n(size_t max_messages, ringbuffer::Reader &r)
Definition messaging.h:216
BufferProcessor(char const *name="")
Definition messaging.h:192
size_t read_all(ringbuffer::Reader &r)
Definition messaging.h:247
size_t run(ringbuffer::Reader &r, IdleBehaviour idler=default_idle_behaviour)
Definition messaging.h:262
void set_finished(bool v=true)
Definition messaging.h:206
Definition messaging.h:38
void set_message_handler(MessageType m, char const *message_label, Handler h)
Definition messaging.h:90
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:151
Dispatcher(char const *name)
Definition messaging.h:76
MessageCounts< MessageType > MessageCounts
Definition messaging.h:40
bool has_handler(MessageType m)
Definition messaging.h:139
void remove_message_handler(MessageType m)
Definition messaging.h:120
Definition messaging.h:23
Definition messaging.h:18
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:208
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition messaging.h:14
std::function< void(size_t num_consecutive_idles)> IdleBehaviour
Definition messaging.h:180
std::unordered_map< MessageType, Counts > MessageCounts
Definition messaging.h:34
std::function< void(const uint8_t *, size_t)> Handler
Definition messaging.h:15
uint32_t Message
Definition ring_buffer_types.h:19
Definition messaging.h:28
size_t messages
Definition messaging.h:29
size_t bytes
Definition messaging.h:30