CCF
Loading...
Searching...
No Matches
oversized.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "messaging.h"
6#include "ring_buffer.h"
7#include "serialized.h"
8
9#define FMT_HEADER_ONLY
10#include <fmt/format.h>
11#include <unordered_map>
12
13#define LOG_AND_THROW(ERROR_TYPE, ...) \
14 do \
15 { \
16 const auto msg = fmt::format(__VA_ARGS__); \
17 LOG_FAIL_FMT("{}", msg); \
18 throw ERROR_TYPE(msg); \
19 } while (0)
20namespace oversized
21{
27
29 {
31
32 struct PartialMessage
33 {
34 const ringbuffer::Message m;
35 const size_t total_size;
36
37 size_t received;
38 uint8_t* data;
39 };
40
41 std::unordered_map<size_t, PartialMessage> partial_messages;
42
43 public:
45 {
47 d,
48 OversizedMessage::fragment,
49 [this](const uint8_t* data, size_t size) {
50 auto message_id = serialized::read<size_t>(data, size);
51
52 auto it = partial_messages.find(message_id);
53 if (it == partial_messages.end())
54 {
55 // First reference to this oversized message - should contain a
56 // header. Read its type, size, then allocate space for it
57 auto m = serialized::read<ringbuffer::Message>(data, size);
58 auto total_size = serialized::read<size_t>(data, size);
59
60 // No safety checks on the size - trust that in normal operation the
61 // Writer has set sensible limits, don't duplicate here
62 uint8_t* dest = new uint8_t[total_size];
63
64 auto ib =
65 partial_messages.insert({message_id, {m, total_size, 0, dest}});
66
67 it = ib.first;
68 }
69
70 auto& partial = it->second;
71 if (size + partial.received > partial.total_size)
72 {
74 message_id,
75 fmt::format(
76 "Too much data for oversized fragmented message. Message {} "
77 "asked for {} bytes, has already written {}, but has sent a "
78 "further {}",
79 message_id,
80 partial.total_size,
81 partial.received,
82 size));
83 }
84
85 ::memcpy(partial.data + partial.received, data, size);
86 partial.received += size;
87 data += size;
88 size -= size;
89
90 if (partial.received == partial.total_size)
91 {
92 // Entire message received - dispatch it then free buffer
93 dispatcher.dispatch(partial.m, partial.data, partial.total_size);
94
95 delete[] partial.data;
96
97 // Erase by key - dispatch may have invalidated previous iterator
98 // (nested fragmented messages - odd, but no reason to disallow)
99 partial_messages.erase(message_id);
100 }
101 });
102 }
103
105 {
106 dispatcher.remove_message_handler(OversizedMessage::fragment);
107
108 for (const auto& [_, partial] : partial_messages)
109 {
110 delete[] partial.data;
111 }
112 }
113 };
114
115#pragma pack(push, 1)
122#pragma pack(pop)
123
125 {
126 private:
127 ringbuffer::WriterPtr underlying_writer;
128
129 const size_t max_fragment_size;
130 const size_t max_total_size;
131
132 struct FragmentProgress
133 {
134 WriteMarker marker; // Track this so a later call can finish this fragment
135 size_t identifier; // Identifier for all fragments of oversized message
136 size_t remainder; // Remaining space in currently prepared fragment buffer
137 };
138
139 // None iff the message is small enough to fit in a single fragment, or
140 // we're not currently within a [prepare, write_bytes*, finish] loop
141 std::optional<FragmentProgress> fragment_progress;
142
143 public:
144 Writer(const ringbuffer::WriterPtr& writer, size_t f, size_t t = -1) :
145 underlying_writer(writer),
146 max_fragment_size(f),
147 max_total_size(t),
148 fragment_progress({})
149 {
150 if (max_fragment_size >= max_total_size)
151 {
153 std::invalid_argument,
154 "Fragment sizes must be smaller than total max: {} >= {}",
155 max_fragment_size,
156 max_total_size);
157 }
158
159 constexpr auto header_size = sizeof(InitialFragmentHeader);
160 if (max_fragment_size <= header_size)
161 {
163 std::invalid_argument,
164 "Fragment size must be large enough to contain the header for the "
165 "initial fragment, and some additional payload data: {} <= {}",
166 max_fragment_size,
167 header_size);
168 }
169 }
170
173 size_t total_size,
174 bool wait = true,
175 size_t* identifier = nullptr) override
176 {
177 // Ensure this is not called out of order
178 if (fragment_progress.has_value())
179 {
181 std::logic_error, "This Writer is already preparing a message");
182 }
183
184 // Small enough to be handled directly by underlying writer
185 if (total_size <= max_fragment_size)
186 {
187 return underlying_writer->prepare(m, total_size, wait, identifier);
188 }
189
190 if (total_size > max_total_size)
191 {
193 std::invalid_argument,
194 "Requested a write of {} bytes, max allowed is {}",
195 total_size,
196 max_total_size);
197 }
198
199 // Need to split this message into multiple fragments
200
201 if (!wait)
202 {
204 std::invalid_argument,
205 "Requested write of {} bytes will be split into multiple fragments: "
206 "caller must wait for these to complete as fragment writes will be "
207 "blocking",
208 total_size);
209 }
210
211 // Prepare space for the first fragment, getting an id for all related
212 // fragments
213 size_t outer_id;
214 const auto marker = underlying_writer->prepare(
215 OversizedMessage::fragment, max_fragment_size, wait, &outer_id);
216 if (!marker.has_value())
217 {
218 return {};
219 }
220
221 // Write the header
222 InitialFragmentHeader header = {outer_id, m, total_size};
223 auto next = underlying_writer->write_bytes(
224 marker, (const uint8_t*)&header, sizeof(header));
225
226 // Track progress in current oversized message
227 fragment_progress = {
228 marker, outer_id, max_fragment_size - sizeof(header)};
229
230 if (identifier != nullptr)
231 *identifier = outer_id;
232
233 // Don't need to store next - it will be an argument of the next call to
234 // write_bytes
235 return next;
236 }
237
238 virtual void finish(const WriteMarker& marker) override
239 {
240 if (fragment_progress.has_value())
241 {
242 // We were writing an oversized message, the given marker means nothing
243 // to us
244 if (fragment_progress->remainder != 0)
245 {
247 std::logic_error,
248 "Attempting to finish an oversized message before the entire "
249 "requested payload has been written");
250 }
251
252 // Finish the final fragment message
253 underlying_writer->finish(fragment_progress->marker);
254
255 // Clean up, ready for next call to prepare
256 fragment_progress = {};
257 }
258 else
259 {
260 // We were writing a small message - get underlying writer to finish it
261 underlying_writer->finish(marker);
262 }
263 }
264
266 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
267 {
268 if (!marker.has_value())
269 {
270 return {};
271 }
272
273 if (!fragment_progress.has_value())
274 {
275 // Writing a small message - nothing to do here
276 return underlying_writer->write_bytes(marker, bytes, size);
277 }
278
279 // Append as much as possible into the current prepared buffer
280 auto write_size = std::min(size, fragment_progress->remainder);
281 auto next = underlying_writer->write_bytes(marker, bytes, write_size);
282 bytes += write_size;
283 size -= write_size;
284 fragment_progress->remainder -= write_size;
285
286 // While there is more to write...
287 while (size > 0)
288 {
289 // Prepare a new fragment
290 const auto id = fragment_progress->identifier;
291 const auto frag_size = std::min(size + sizeof(id), max_fragment_size);
292 next = underlying_writer->prepare(
293 OversizedMessage::fragment, frag_size, true);
294
295 if (!next.has_value())
296 {
297 // Intermediate fragment failed - this is unexpected
299 std::logic_error,
300 "Failed to create fragment for oversized message");
301
302 // If this path is hit it is likely because we have allowed oversized
303 // writes to write without waiting. Some initial fragments were
304 // written, but there is insufficient space to write this fragment.
305 // In this case we can either cancel the entire oversized message, or
306 // retry. In either case we should send a message to inform the
307 // reader.
308 fragment_progress->remainder = 0;
309 break;
310 }
311
312 // Finish the previous fragment
313 underlying_writer->finish(fragment_progress->marker);
314
315 // Update progress tracking to reference the new fragment
316 write_size = frag_size - sizeof(id);
317 fragment_progress->marker = next;
318 fragment_progress->remainder = write_size;
319
320 // Write the id of the oversized message
321 next =
322 underlying_writer->write_bytes(next, (const uint8_t*)&id, sizeof(id));
323
324 // Write some fragment payload
325 next = underlying_writer->write_bytes(next, bytes, write_size);
326 bytes += write_size;
327 size -= write_size;
328 fragment_progress->remainder -= write_size;
329 }
330
331 return next;
332 }
333
334 size_t get_max_message_size() override
335 {
336 return max_total_size;
337 }
338 };
339
341 {
344 };
345
346 // Wrap ringbuffer::Circuit to provide the same fragment/total maximum sizes
347 // for every Writer
349 {
350 AbstractWriterFactory& factory_impl;
351
352 const WriterConfig config;
353
354 public:
355 WriterFactory(AbstractWriterFactory& impl, const WriterConfig& config_) :
356 factory_impl(impl),
357 config(config_)
358 {}
359
360 std::shared_ptr<oversized::Writer> create_oversized_writer_to_outside()
361 {
362 return std::make_shared<oversized::Writer>(
363 factory_impl.create_writer_to_outside(),
364 config.max_fragment_size,
365 config.max_total_size);
366 }
367
368 std::shared_ptr<oversized::Writer> create_oversized_writer_to_inside()
369 {
370 return std::make_shared<oversized::Writer>(
371 factory_impl.create_writer_to_inside(),
372 config.max_fragment_size,
373 config.max_total_size);
374 }
375
376 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
377 override
378 {
380 }
381
382 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
383 override
384 {
386 }
387 };
388}
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition oversized.h:29
FragmentReconstructor(messaging::RingbufferDispatcher &d)
Definition oversized.h:44
~FragmentReconstructor()
Definition oversized.h:104
Definition oversized.h:349
std::shared_ptr< oversized::Writer > create_oversized_writer_to_outside()
Definition oversized.h:360
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition oversized.h:376
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition oversized.h:382
WriterFactory(AbstractWriterFactory &impl, const WriterConfig &config_)
Definition oversized.h:355
std::shared_ptr< oversized::Writer > create_oversized_writer_to_inside()
Definition oversized.h:368
Definition oversized.h:125
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition oversized.h:265
Writer(const ringbuffer::WriterPtr &writer, size_t f, size_t t=-1)
Definition oversized.h:144
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool wait=true, size_t *identifier=nullptr) override
Definition oversized.h:171
virtual void finish(const WriteMarker &marker) override
Definition oversized.h:238
size_t get_max_message_size() override
Definition oversized.h:334
Definition ring_buffer_types.h:153
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition ring_buffer_types.h:49
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition oversized.h:21
OversizedMessage
Definition oversized.h:23
@ DEFINE_RINGBUFFER_MSG_TYPE
Part of a larger message. Can be sent both ways.
Definition oversized.h:25
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19
#define LOG_AND_THROW(ERROR_TYPE,...)
Definition oversized.h:13
Definition oversized.h:117
size_t total_size
Definition oversized.h:120
ringbuffer::Message contained
Definition oversized.h:119
size_t identifier
Definition oversized.h:118
Definition oversized.h:341
size_t max_fragment_size
Definition oversized.h:342
size_t max_total_size
Definition oversized.h:343