10#include <fmt/format.h>
11#include <unordered_map>
13#define LOG_AND_THROW(ERROR_TYPE, ...) \
16 const auto msg = fmt::format(__VA_ARGS__); \
17 LOG_FAIL_FMT("{}", msg); \
18 throw ERROR_TYPE(msg); \
35 const size_t total_size;
41 std::unordered_map<size_t, PartialMessage> partial_messages;
48 OversizedMessage::fragment,
49 [
this](
const uint8_t* data,
size_t size) {
50 auto message_id = serialized::read<size_t>(data, size);
52 auto it = partial_messages.find(message_id);
53 if (it == partial_messages.end())
57 auto m = serialized::read<ringbuffer::Message>(data, size);
58 auto total_size = serialized::read<size_t>(data, size);
62 uint8_t* dest =
new uint8_t[total_size];
65 partial_messages.insert({message_id, {m, total_size, 0, dest}});
70 auto& partial = it->second;
71 if (size + partial.received > partial.total_size)
76 "Too much data for oversized fragmented message. Message {} "
77 "asked for {} bytes, has already written {}, but has sent a "
85 ::memcpy(partial.data + partial.received, data, size);
86 partial.received += size;
90 if (partial.received == partial.total_size)
93 dispatcher.
dispatch(partial.m, partial.data, partial.total_size);
95 delete[] partial.data;
99 partial_messages.erase(message_id);
108 for (
const auto& [_, partial] : partial_messages)
110 delete[] partial.data;
129 const size_t max_fragment_size;
130 const size_t max_total_size;
132 struct FragmentProgress
141 std::optional<FragmentProgress> fragment_progress;
145 underlying_writer(writer),
146 max_fragment_size(f),
148 fragment_progress({})
150 if (max_fragment_size >= max_total_size)
153 std::invalid_argument,
154 "Fragment sizes must be smaller than total max: {} >= {}",
159 constexpr auto header_size =
sizeof(InitialFragmentHeader);
160 if (max_fragment_size <= header_size)
163 std::invalid_argument,
164 "Fragment size must be large enough to contain the header for the "
165 "initial fragment, and some additional payload data: {} <= {}",
175 size_t* identifier =
nullptr)
override
178 if (fragment_progress.has_value())
181 std::logic_error,
"This Writer is already preparing a message");
185 if (total_size <= max_fragment_size)
187 return underlying_writer->prepare(m, total_size, wait, identifier);
190 if (total_size > max_total_size)
193 std::invalid_argument,
194 "Requested a write of {} bytes, max allowed is {}",
204 std::invalid_argument,
205 "Requested write of {} bytes will be split into multiple fragments: "
206 "caller must wait for these to complete as fragment writes will be "
214 const auto marker = underlying_writer->prepare(
215 OversizedMessage::fragment, max_fragment_size, wait, &outer_id);
216 if (!marker.has_value())
223 auto next = underlying_writer->write_bytes(
224 marker, (
const uint8_t*)&header,
sizeof(header));
227 fragment_progress = {
228 marker, outer_id, max_fragment_size -
sizeof(header)};
230 if (identifier !=
nullptr)
231 *identifier = outer_id;
240 if (fragment_progress.has_value())
244 if (fragment_progress->remainder != 0)
248 "Attempting to finish an oversized message before the entire "
249 "requested payload has been written");
253 underlying_writer->finish(fragment_progress->marker);
256 fragment_progress = {};
261 underlying_writer->finish(marker);
266 const WriteMarker& marker,
const uint8_t* bytes,
size_t size)
override
268 if (!marker.has_value())
273 if (!fragment_progress.has_value())
276 return underlying_writer->write_bytes(marker, bytes, size);
280 auto write_size = std::min(size, fragment_progress->remainder);
281 auto next = underlying_writer->write_bytes(marker, bytes, write_size);
284 fragment_progress->remainder -= write_size;
290 const auto id = fragment_progress->identifier;
291 const auto frag_size = std::min(size +
sizeof(
id), max_fragment_size);
292 next = underlying_writer->prepare(
293 OversizedMessage::fragment, frag_size,
true);
295 if (!next.has_value())
300 "Failed to create fragment for oversized message");
308 fragment_progress->remainder = 0;
313 underlying_writer->finish(fragment_progress->marker);
316 write_size = frag_size -
sizeof(id);
317 fragment_progress->marker = next;
318 fragment_progress->remainder = write_size;
322 underlying_writer->write_bytes(next, (
const uint8_t*)&id,
sizeof(id));
325 next = underlying_writer->write_bytes(next, bytes, write_size);
328 fragment_progress->remainder -= write_size;
336 return max_total_size;
350 AbstractWriterFactory& factory_impl;
362 return std::make_shared<oversized::Writer>(
363 factory_impl.create_writer_to_outside(),
370 return std::make_shared<oversized::Writer>(
371 factory_impl.create_writer_to_inside(),
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition oversized.h:29
FragmentReconstructor(messaging::RingbufferDispatcher &d)
Definition oversized.h:44
~FragmentReconstructor()
Definition oversized.h:104
Definition oversized.h:349
std::shared_ptr< oversized::Writer > create_oversized_writer_to_outside()
Definition oversized.h:360
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition oversized.h:376
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition oversized.h:382
WriterFactory(AbstractWriterFactory &impl, const WriterConfig &config_)
Definition oversized.h:355
std::shared_ptr< oversized::Writer > create_oversized_writer_to_inside()
Definition oversized.h:368
Definition oversized.h:125
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition oversized.h:265
Writer(const ringbuffer::WriterPtr &writer, size_t f, size_t t=-1)
Definition oversized.h:144
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool wait=true, size_t *identifier=nullptr) override
Definition oversized.h:171
virtual void finish(const WriteMarker &marker) override
Definition oversized.h:238
size_t get_max_message_size() override
Definition oversized.h:334
Definition ring_buffer_types.h:153
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition ring_buffer_types.h:49
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition oversized.h:21
OversizedMessage
Definition oversized.h:23
@ DEFINE_RINGBUFFER_MSG_TYPE
Part of a larger message. Can be sent both ways.
Definition oversized.h:25
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19
#define LOG_AND_THROW(ERROR_TYPE,...)
Definition oversized.h:13
Definition oversized.h:341
size_t max_fragment_size
Definition oversized.h:342
size_t max_total_size
Definition oversized.h:343