CCF
Loading...
Searching...
No Matches
oversized.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "messaging.h"
6#include "ring_buffer.h"
7#include "serialized.h"
8
9#define FMT_HEADER_ONLY
10#include <fmt/format.h>
11#include <unordered_map>
12#include <utility>
13
14#define LOG_AND_THROW(ERROR_TYPE, ...) \
15 do \
16 { \
17 const auto msg = fmt::format(__VA_ARGS__); \
18 LOG_FAIL_FMT("{}", msg); \
19 throw ERROR_TYPE(msg); \
20 } while (0)
21namespace oversized
22{
28
30 {
32
33 struct PartialMessage
34 {
35 const ringbuffer::Message m;
36 const size_t total_size;
37
38 size_t received;
39 uint8_t* data;
40 };
41
42 std::unordered_map<size_t, PartialMessage> partial_messages;
43
44 public:
46 {
48 d,
49 OversizedMessage::fragment,
50 [this](const uint8_t* data, size_t size) {
51 auto message_id = serialized::read<size_t>(data, size);
52
53 auto it = partial_messages.find(message_id);
54 if (it == partial_messages.end())
55 {
56 // First reference to this oversized message - should contain a
57 // header. Read its type, size, then allocate space for it
58 auto m = serialized::read<ringbuffer::Message>(data, size);
59 auto total_size = serialized::read<size_t>(data, size);
60
61 // No safety checks on the size - trust that in normal operation the
62 // Writer has set sensible limits, don't duplicate here
63 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
64 auto* dest = new uint8_t[total_size];
65
66 auto ib =
67 partial_messages.insert({message_id, {m, total_size, 0, dest}});
68
69 it = ib.first;
70 }
71
72 auto& partial = it->second;
73 if (size + partial.received > partial.total_size)
74 {
76 message_id,
77 fmt::format(
78 "Too much data for oversized fragmented message. Message {} "
79 "asked for {} bytes, has already written {}, but has sent a "
80 "further {}",
81 message_id,
82 partial.total_size,
83 partial.received,
84 size));
85 }
86
87 ::memcpy(partial.data + partial.received, data, size);
88 partial.received += size;
89 data += size;
90 size -= size;
91
92 if (partial.received == partial.total_size)
93 {
94 // Entire message received - dispatch it then free buffer
95 dispatcher.dispatch(partial.m, partial.data, partial.total_size);
96
97 delete[] partial.data; // NOLINT(cppcoreguidelines-owning-memory)
98
99 // Erase by key - dispatch may have invalidated previous iterator
100 // (nested fragmented messages - odd, but no reason to disallow)
101 partial_messages.erase(message_id);
102 }
103 });
104 }
105
107 {
108 try
109 {
110 dispatcher.remove_message_handler(OversizedMessage::fragment);
111 }
112 catch (...) // NOLINT(bugprone-empty-catch)
113 {
114 // Destructors must not throw - exception ignored
115 }
116
117 for (const auto& [_, partial] : partial_messages)
118 {
119 delete[] partial.data; // NOLINT(cppcoreguidelines-owning-memory)
120 }
121 }
122 };
123
124#pragma pack(push, 1)
131#pragma pack(pop)
132
134 {
135 private:
136 ringbuffer::WriterPtr underlying_writer;
137
138 const size_t max_fragment_size;
139 const size_t max_total_size;
140
141 struct FragmentProgress
142 {
143 WriteMarker marker; // Track this so a later call can finish this fragment
144 size_t identifier; // Identifier for all fragments of oversized message
145 size_t remainder; // Remaining space in currently prepared fragment buffer
146 };
147
148 // None iff the message is small enough to fit in a single fragment, or
149 // we're not currently within a [prepare, write_bytes*, finish] loop
150 std::optional<FragmentProgress> fragment_progress;
151
152 public:
153 Writer(ringbuffer::WriterPtr writer, size_t f, size_t t = -1) :
154 underlying_writer(std::move(writer)),
155 max_fragment_size(f),
156 max_total_size(t)
157 {
158 if (max_fragment_size >= max_total_size)
159 {
161 std::invalid_argument,
162 "Fragment sizes must be smaller than total max: {} >= {}",
163 max_fragment_size,
164 max_total_size);
165 }
166
167 constexpr auto header_size = sizeof(InitialFragmentHeader);
168 if (max_fragment_size <= header_size)
169 {
171 std::invalid_argument,
172 "Fragment size must be large enough to contain the header for the "
173 "initial fragment, and some additional payload data: {} <= {}",
174 max_fragment_size,
175 header_size);
176 }
177 }
178
181 size_t total_size,
182 bool wait = true,
183 size_t* identifier = nullptr) override
184 {
185 // Ensure this is not called out of order
186 if (fragment_progress.has_value())
187 {
189 std::logic_error, "This Writer is already preparing a message");
190 }
191
192 // Small enough to be handled directly by underlying writer
193 if (total_size <= max_fragment_size)
194 {
195 return underlying_writer->prepare(m, total_size, wait, identifier);
196 }
197
198 if (total_size > max_total_size)
199 {
201 std::invalid_argument,
202 "Requested a write of {} bytes, max allowed is {}",
203 total_size,
204 max_total_size);
205 }
206
207 // Need to split this message into multiple fragments
208
209 if (!wait)
210 {
212 std::invalid_argument,
213 "Requested write of {} bytes will be split into multiple fragments: "
214 "caller must wait for these to complete as fragment writes will be "
215 "blocking",
216 total_size);
217 }
218
219 // Prepare space for the first fragment, getting an id for all related
220 // fragments
221 size_t outer_id = 0;
222 const auto marker = underlying_writer->prepare(
223 OversizedMessage::fragment, max_fragment_size, wait, &outer_id);
224 if (!marker.has_value())
225 {
226 return {};
227 }
228
229 // Write the header
230 InitialFragmentHeader header = {outer_id, m, total_size};
231 auto next = underlying_writer->write_bytes(
232 marker, reinterpret_cast<const uint8_t*>(&header), sizeof(header));
233
234 // Track progress in current oversized message
235 fragment_progress = {
236 marker, outer_id, max_fragment_size - sizeof(header)};
237
238 if (identifier != nullptr)
239 {
240 *identifier = outer_id;
241 }
242
243 // Don't need to store next - it will be an argument of the next call to
244 // write_bytes
245 return next;
246 }
247
248 void finish(const WriteMarker& marker) override
249 {
250 if (fragment_progress.has_value())
251 {
252 // We were writing an oversized message, the given marker means nothing
253 // to us
254 if (fragment_progress->remainder != 0)
255 {
257 std::logic_error,
258 "Attempting to finish an oversized message before the entire "
259 "requested payload has been written");
260 }
261
262 // Finish the final fragment message
263 underlying_writer->finish(fragment_progress->marker);
264
265 // Clean up, ready for next call to prepare
266 fragment_progress = {};
267 }
268 else
269 {
270 // We were writing a small message - get underlying writer to finish it
271 underlying_writer->finish(marker);
272 }
273 }
274
276 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
277 {
278 if (!marker.has_value())
279 {
280 return {};
281 }
282
283 if (!fragment_progress.has_value())
284 {
285 // Writing a small message - nothing to do here
286 return underlying_writer->write_bytes(marker, bytes, size);
287 }
288
289 // Append as much as possible into the current prepared buffer
290 auto write_size = std::min(size, fragment_progress->remainder);
291 auto next = underlying_writer->write_bytes(marker, bytes, write_size);
292 bytes += write_size;
293 size -= write_size;
294 fragment_progress->remainder -= write_size;
295
296 // While there is more to write...
297 while (size > 0)
298 {
299 // Prepare a new fragment
300 const auto id = fragment_progress->identifier;
301 const auto frag_size = std::min(size + sizeof(id), max_fragment_size);
302 next = underlying_writer->prepare(
303 OversizedMessage::fragment, frag_size, true);
304
305 if (!next.has_value())
306 {
307 // Intermediate fragment failed - this is unexpected
309 std::logic_error,
310 "Failed to create fragment for oversized message");
311
312 // If this path is hit it is likely because we have allowed oversized
313 // writes to write without waiting. Some initial fragments were
314 // written, but there is insufficient space to write this fragment.
315 // In this case we can either cancel the entire oversized message, or
316 // retry. In either case we should send a message to inform the
317 // reader.
318 fragment_progress->remainder = 0;
319 break;
320 }
321
322 // Finish the previous fragment
323 underlying_writer->finish(fragment_progress->marker);
324
325 // Update progress tracking to reference the new fragment
326 write_size = frag_size - sizeof(id);
327 fragment_progress->marker = next;
328 fragment_progress->remainder = write_size;
329
330 // Write the id of the oversized message
331 next = underlying_writer->write_bytes(
332 next, reinterpret_cast<const uint8_t*>(&id), sizeof(id));
333
334 // Write some fragment payload
335 next = underlying_writer->write_bytes(next, bytes, write_size);
336 bytes += write_size;
337 size -= write_size;
338 fragment_progress->remainder -= write_size;
339 }
340
341 return next;
342 }
343
344 size_t get_max_message_size() override
345 {
346 return max_total_size;
347 }
348 };
349
351 {
354 };
355
356 // Wrap ringbuffer::Circuit to provide the same fragment/total maximum sizes
357 // for every Writer
359 {
360 AbstractWriterFactory& factory_impl;
361
362 const WriterConfig config;
363
364 public:
365 WriterFactory(AbstractWriterFactory& impl, const WriterConfig& config_) :
366 factory_impl(impl),
367 config(config_)
368 {}
369
370 std::shared_ptr<oversized::Writer> create_oversized_writer_to_outside()
371 {
372 return std::make_shared<oversized::Writer>(
373 factory_impl.create_writer_to_outside(),
374 config.max_fragment_size,
375 config.max_total_size);
376 }
377
378 std::shared_ptr<oversized::Writer> create_oversized_writer_to_inside()
379 {
380 return std::make_shared<oversized::Writer>(
381 factory_impl.create_writer_to_inside(),
382 config.max_fragment_size,
383 config.max_total_size);
384 }
385
386 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
387 override
388 {
390 }
391
392 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
393 override
394 {
396 }
397 };
398}
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition oversized.h:30
~FragmentReconstructor() noexcept
Definition oversized.h:106
FragmentReconstructor(messaging::RingbufferDispatcher &d)
Definition oversized.h:45
Definition oversized.h:359
std::shared_ptr< oversized::Writer > create_oversized_writer_to_outside()
Definition oversized.h:370
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition oversized.h:386
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition oversized.h:392
WriterFactory(AbstractWriterFactory &impl, const WriterConfig &config_)
Definition oversized.h:365
std::shared_ptr< oversized::Writer > create_oversized_writer_to_inside()
Definition oversized.h:378
Definition oversized.h:134
WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition oversized.h:275
WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool wait=true, size_t *identifier=nullptr) override
Definition oversized.h:179
Writer(ringbuffer::WriterPtr writer, size_t f, size_t t=-1)
Definition oversized.h:153
void finish(const WriteMarker &marker) override
Definition oversized.h:248
size_t get_max_message_size() override
Definition oversized.h:344
Definition ring_buffer_types.h:157
Definition ring_buffer_types.h:63
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
Definition ring_buffer_types.h:51
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition oversized.h:22
OversizedMessage
Definition oversized.h:24
@ DEFINE_RINGBUFFER_MSG_TYPE
Part of a larger message. Can be sent both ways.
Definition oversized.h:26
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
uint32_t Message
Definition ring_buffer_types.h:19
STL namespace.
#define LOG_AND_THROW(ERROR_TYPE,...)
Definition oversized.h:14
Definition oversized.h:126
size_t total_size
Definition oversized.h:129
ringbuffer::Message contained
Definition oversized.h:128
size_t identifier
Definition oversized.h:127
Definition oversized.h:351
size_t max_fragment_size
Definition oversized.h:352
size_t max_total_size
Definition oversized.h:353