CCF
Loading...
Searching...
No Matches
messaging.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ring_buffer.h"
7
8#include <atomic>
9#include <condition_variable>
10#include <map>
11#include <stdexcept>
12
13namespace messaging
14{
15 using Handler = std::function<void(const uint8_t*, size_t)>;
16
17 class no_handler : public std::logic_error
18 {
19 using logic_error::logic_error;
20 };
21
22 class already_handled : public std::logic_error
23 {
24 using logic_error::logic_error;
25 };
26
27 struct Counts
28 {
29 size_t messages;
30 size_t bytes;
31 };
32
33 template <typename MessageType>
34 using MessageCounts = std::unordered_map<MessageType, Counts>;
35
36 template <typename MessageType>
38 {
39 public:
41
42 private:
43 // Store a name to distinguish error messages
44 char const* const name;
45
46 std::map<MessageType, Handler> handlers;
47 std::map<MessageType, char const*> message_labels;
48
49 MessageCounts message_counts;
50
51 std::string get_error_prefix()
52 {
53 return std::string("[") + std::string(name) + std::string("] ");
54 }
55
56 char const* get_message_name(MessageType m)
57 {
58 const auto it = message_labels.find(m);
59 if (it == message_labels.end())
60 {
61 return "unknown";
62 }
63
64 return it->second;
65 }
66
67 static std::string decorate_message_name(MessageType m, char const* s)
68 {
69 return fmt::format("<{}:{}>", s, m);
70 }
71
72 std::string get_decorated_message_name(MessageType m)
73 {
74 return decorate_message_name(m, get_message_name(m));
75 }
76
77 public:
78 Dispatcher(char const* name) : name(name), handlers() {}
79
93 MessageType m, char const* message_label, Handler h)
94 {
95 // Check for presence first, so we only copy if we're actually inserting
96 auto it = handlers.find(m);
97 if (it != handlers.end())
98 {
99 throw already_handled(
100 get_error_prefix() + "MessageType " + std::to_string(m) +
101 " already handled by " + get_decorated_message_name(m) +
102 ", cannot set handler for " +
103 decorate_message_name(m, message_label));
104 }
105
106 LOG_DEBUG_FMT("Setting handler for {} ({})", message_label, m);
107 handlers.insert(it, {m, h});
108
109 if (message_label != nullptr)
110 {
111 message_labels.emplace(m, message_label);
112 }
113 }
114
122 void remove_message_handler(MessageType m)
123 {
124 auto it = handlers.find(m);
125 if (it == handlers.end())
126 {
127 throw no_handler(
128 get_error_prefix() +
129 "Can't remove non-existent handler for this message: " +
130 get_decorated_message_name(m));
131 }
132
133 handlers.erase(it);
134 }
135
141 bool has_handler(MessageType m)
142 {
143 return handlers.find(m) != handlers.end();
144 }
145
153 void dispatch(MessageType m, const uint8_t* data, size_t size)
154 {
155 auto it = handlers.find(m);
156 if (it == handlers.end())
157 {
158 throw no_handler(
159 get_error_prefix() +
160 "No handler for this message: " + get_decorated_message_name(m));
161 }
162
163 // Handlers may register or remove handlers, so iterator is invalidated
164 try
165 {
166 it->second(data, size);
167 }
168 catch (const std::exception& e)
169 {
171 "Exception while processing message {} of size {}",
172 get_decorated_message_name(m),
173 size);
174 LOG_TRACE_FMT("{}", e.what());
175 throw e;
176 }
177
178 auto& counts = message_counts[m];
179 counts.messages++;
180 counts.bytes += size;
181 }
182
184 {
185 MessageCounts current;
186 std::swap(message_counts, current);
187 return current;
188 }
189
190 nlohmann::json convert_message_counts(const MessageCounts& mc)
191 {
192 auto j = nlohmann::json::object();
193 for (const auto& it : mc)
194 {
195 j[get_message_name(it.first)] = {
196 {"count", it.second.messages}, {"bytes", it.second.bytes}};
197 }
198 return j;
199 }
200 };
201
203
204 using IdleBehaviour = std::function<void(size_t num_consecutive_idles)>;
205 static inline void default_idle_behaviour(size_t)
206 {
207 std::this_thread::yield();
208 }
209
211 {
212 RingbufferDispatcher dispatcher;
213 std::atomic<bool> finished;
214
215 public:
216 BufferProcessor(char const* name = "") : dispatcher(name), finished(false)
217 {}
218
220 {
221 return dispatcher;
222 }
223
224 template <typename... Ts>
225 void set_message_handler(Ts&&... ts)
226 {
227 dispatcher.set_message_handler(std::forward<Ts>(ts)...);
228 }
229
230 void set_finished(bool v = true)
231 {
232 finished.store(v);
233 }
234
236 {
237 return finished.load();
238 }
239
240 size_t read_n(size_t max_messages, ringbuffer::Reader& r)
241 {
242 size_t total_read = 0;
243 size_t previous_read = -1;
244
245 while (!finished.load() && total_read < max_messages)
246 {
247 // Read one at a time so we don't process any after being told to stop
248 auto read = r.read(
249 1,
250 [&d = dispatcher](
251 ringbuffer::Message m, const uint8_t* data, size_t size) {
252 d.dispatch(m, data, size);
253 });
254
255 total_read += read;
256
257 // Because of padding messages, a single empty read may be seen near the
258 // end of the ringbuffer. Use consecutive empty reads as a sign that
259 // we've actually read everything
260 if (read == 0 && previous_read == 0)
261 {
262 break;
263 }
264
265 previous_read = read;
266 }
267
268 return total_read;
269 }
270
272 {
273 size_t total_read = 0;
274 while (true)
275 {
276 const auto read = read_n(1, r);
277 total_read += read;
278 if (read == 0)
279 {
280 break;
281 }
282 }
283 return total_read;
284 }
285
286 size_t run(
287 ringbuffer::Reader& r, IdleBehaviour idler = default_idle_behaviour)
288 {
289 size_t total_read = 0;
290 size_t consecutive_idles = 0u;
291
292 while (!finished.load())
293 {
294 auto num_read = read_all(r);
295 total_read += num_read;
296
297 if (num_read == 0)
298 {
299 idler(consecutive_idles++);
300 }
301 else
302 {
303 consecutive_idles = 0;
304 }
305 }
306
307 return total_read;
308 }
309 };
310
311 // The last variadic argument is expected to be the handler itself. It is
312 // variadic so that you can use an inline lambda, _with commas_. The
313 // preprocessor will blindly paste this as (what it thinks are) multiple
314 // arguments to set_message_handler, and the real processor will recognise
315 // will read it as the original lambda.
316#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG, ...) \
317 DISP.set_message_handler(MSG, #MSG, __VA_ARGS__)
318}
Definition messaging.h:211
void set_message_handler(Ts &&... ts)
Definition messaging.h:225
bool get_finished()
Definition messaging.h:235
RingbufferDispatcher & get_dispatcher()
Definition messaging.h:219
size_t read_n(size_t max_messages, ringbuffer::Reader &r)
Definition messaging.h:240
BufferProcessor(char const *name="")
Definition messaging.h:216
size_t read_all(ringbuffer::Reader &r)
Definition messaging.h:271
size_t run(ringbuffer::Reader &r, IdleBehaviour idler=default_idle_behaviour)
Definition messaging.h:286
void set_finished(bool v=true)
Definition messaging.h:230
Definition messaging.h:38
void set_message_handler(MessageType m, char const *message_label, Handler h)
Definition messaging.h:92
nlohmann::json convert_message_counts(const MessageCounts &mc)
Definition messaging.h:190
MessageCounts retrieve_message_counts()
Definition messaging.h:183
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
Dispatcher(char const *name)
Definition messaging.h:78
MessageCounts< MessageType > MessageCounts
Definition messaging.h:40
bool has_handler(MessageType m)
Definition messaging.h:141
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition messaging.h:23
Definition messaging.h:18
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:206
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition messaging.h:14
std::function< void(size_t num_consecutive_idles)> IdleBehaviour
Definition messaging.h:204
std::unordered_map< MessageType, Counts > MessageCounts
Definition messaging.h:34
std::function< void(const uint8_t *, size_t)> Handler
Definition messaging.h:15
uint32_t Message
Definition ring_buffer_types.h:19
Definition messaging.h:28
size_t messages
Definition messaging.h:29
size_t bytes
Definition messaging.h:30