10#include <fmt/format.h>
11#include <unordered_map>
14#define LOG_AND_THROW(ERROR_TYPE, ...) \
17 const auto msg = fmt::format(__VA_ARGS__); \
18 LOG_FAIL_FMT("{}", msg); \
19 throw ERROR_TYPE(msg); \
36 const size_t total_size;
42 std::unordered_map<size_t, PartialMessage> partial_messages;
49 OversizedMessage::fragment,
50 [
this](
const uint8_t* data,
size_t size) {
51 auto message_id = serialized::read<size_t>(data, size);
53 auto it = partial_messages.find(message_id);
54 if (it == partial_messages.end())
58 auto m = serialized::read<ringbuffer::Message>(data, size);
59 auto total_size = serialized::read<size_t>(data, size);
64 auto* dest =
new uint8_t[total_size];
67 partial_messages.insert({message_id, {m, total_size, 0, dest}});
72 auto& partial = it->second;
73 if (size + partial.received > partial.total_size)
78 "Too much data for oversized fragmented message. Message {} "
79 "asked for {} bytes, has already written {}, but has sent a "
87 ::memcpy(partial.data + partial.received, data, size);
88 partial.received += size;
92 if (partial.received == partial.total_size)
95 dispatcher.
dispatch(partial.m, partial.data, partial.total_size);
97 delete[] partial.data;
101 partial_messages.erase(message_id);
117 for (
const auto& [_, partial] : partial_messages)
119 delete[] partial.data;
138 const size_t max_fragment_size;
139 const size_t max_total_size;
141 struct FragmentProgress
150 std::optional<FragmentProgress> fragment_progress;
154 underlying_writer(
std::move(writer)),
155 max_fragment_size(f),
158 if (max_fragment_size >= max_total_size)
161 std::invalid_argument,
162 "Fragment sizes must be smaller than total max: {} >= {}",
168 if (max_fragment_size <= header_size)
171 std::invalid_argument,
172 "Fragment size must be large enough to contain the header for the "
173 "initial fragment, and some additional payload data: {} <= {}",
183 size_t* identifier =
nullptr)
override
186 if (fragment_progress.has_value())
189 std::logic_error,
"This Writer is already preparing a message");
193 if (total_size <= max_fragment_size)
195 return underlying_writer->prepare(m, total_size, wait, identifier);
198 if (total_size > max_total_size)
201 std::invalid_argument,
202 "Requested a write of {} bytes, max allowed is {}",
212 std::invalid_argument,
213 "Requested write of {} bytes will be split into multiple fragments: "
214 "caller must wait for these to complete as fragment writes will be "
222 const auto marker = underlying_writer->prepare(
223 OversizedMessage::fragment, max_fragment_size, wait, &outer_id);
224 if (!marker.has_value())
231 auto next = underlying_writer->write_bytes(
232 marker,
reinterpret_cast<const uint8_t*
>(&header),
sizeof(header));
235 fragment_progress = {
236 marker, outer_id, max_fragment_size -
sizeof(header)};
238 if (identifier !=
nullptr)
240 *identifier = outer_id;
250 if (fragment_progress.has_value())
254 if (fragment_progress->remainder != 0)
258 "Attempting to finish an oversized message before the entire "
259 "requested payload has been written");
263 underlying_writer->finish(fragment_progress->marker);
266 fragment_progress = {};
271 underlying_writer->finish(marker);
276 const WriteMarker& marker,
const uint8_t* bytes,
size_t size)
override
278 if (!marker.has_value())
283 if (!fragment_progress.has_value())
286 return underlying_writer->write_bytes(marker, bytes, size);
290 auto write_size = std::min(size, fragment_progress->remainder);
291 auto next = underlying_writer->write_bytes(marker, bytes, write_size);
294 fragment_progress->remainder -= write_size;
300 const auto id = fragment_progress->identifier;
301 const auto frag_size = std::min(size +
sizeof(
id), max_fragment_size);
302 next = underlying_writer->prepare(
303 OversizedMessage::fragment, frag_size,
true);
305 if (!next.has_value())
310 "Failed to create fragment for oversized message");
318 fragment_progress->remainder = 0;
323 underlying_writer->finish(fragment_progress->marker);
326 write_size = frag_size -
sizeof(id);
327 fragment_progress->marker = next;
328 fragment_progress->remainder = write_size;
331 next = underlying_writer->write_bytes(
332 next,
reinterpret_cast<const uint8_t*
>(&
id),
sizeof(id));
335 next = underlying_writer->write_bytes(next, bytes, write_size);
338 fragment_progress->remainder -= write_size;
346 return max_total_size;
360 AbstractWriterFactory& factory_impl;
372 return std::make_shared<oversized::Writer>(
373 factory_impl.create_writer_to_outside(),
380 return std::make_shared<oversized::Writer>(
381 factory_impl.create_writer_to_inside(),
void dispatch(MessageType m, const uint8_t *data, size_t size)
Definition messaging.h:153
void remove_message_handler(MessageType m)
Definition messaging.h:122
Definition oversized.h:30
~FragmentReconstructor() noexcept
Definition oversized.h:106
FragmentReconstructor(messaging::RingbufferDispatcher &d)
Definition oversized.h:45
Definition oversized.h:359
std::shared_ptr< oversized::Writer > create_oversized_writer_to_outside()
Definition oversized.h:370
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition oversized.h:386
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition oversized.h:392
WriterFactory(AbstractWriterFactory &impl, const WriterConfig &config_)
Definition oversized.h:365
std::shared_ptr< oversized::Writer > create_oversized_writer_to_inside()
Definition oversized.h:378
Definition oversized.h:134
WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition oversized.h:275
WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool wait=true, size_t *identifier=nullptr) override
Definition oversized.h:179
Writer(ringbuffer::WriterPtr writer, size_t f, size_t t=-1)
Definition oversized.h:153
void finish(const WriteMarker &marker) override
Definition oversized.h:248
size_t get_max_message_size() override
Definition oversized.h:344
Definition ring_buffer_types.h:157
Definition ring_buffer_types.h:63
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
Definition ring_buffer_types.h:51
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition oversized.h:22
OversizedMessage
Definition oversized.h:24
@ DEFINE_RINGBUFFER_MSG_TYPE
Part of a larger message. Can be sent both ways.
Definition oversized.h:26
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
uint32_t Message
Definition ring_buffer_types.h:19
#define LOG_AND_THROW(ERROR_TYPE,...)
Definition oversized.h:14
Definition oversized.h:351
size_t max_fragment_size
Definition oversized.h:352
size_t max_total_size
Definition oversized.h:353