32 bool finished =
false;
33 std::vector<uint8_t> buffer;
35 PendingMessage(
Message m_, std::vector<uint8_t>&& buffer_) :
37 buffer(std::move(buffer_))
41 std::deque<PendingMessage> pending;
51 size_t* identifier =
nullptr)
override
57 underlying_writer->prepare(m, total_size,
false, identifier);
59 if (marker.has_value())
67 pending.emplace_back(m, std::vector<uint8_t>(total_size));
69 auto& msg = pending.back();
70 msg.marker =
reinterpret_cast<size_t>(msg.buffer.data());
79 if (marker.has_value())
81 for (
auto& it : pending)
85 if (
reinterpret_cast<size_t>(it.buffer.data()) == marker.value())
95 underlying_writer->finish(marker);
99 const WriteMarker& marker,
const uint8_t* bytes,
size_t size)
override
101 if (marker.has_value())
103 for (
auto& it : pending)
110 if (it.marker == marker.value())
114 auto* dest =
reinterpret_cast<uint8_t*
>(marker.value());
115 if (dest < it.buffer.data())
117 throw std::runtime_error(fmt::format(
118 "Invalid pending marker - writing before buffer: {} < {}",
119 reinterpret_cast<size_t>(dest),
120 reinterpret_cast<size_t>(it.buffer.data())));
123 auto*
const buffer_end = it.buffer.data() + it.buffer.size();
124 if (dest + size > buffer_end)
126 throw std::runtime_error(fmt::format(
127 "Invalid pending marker - write extends beyond buffer: {} + {} "
129 reinterpret_cast<size_t>(dest),
131 reinterpret_cast<size_t>(buffer_end)));
136 std::memcpy(dest, bytes, size);
140 it.marker =
reinterpret_cast<size_t>(dest);
148 return underlying_writer->write_bytes(marker, bytes, size);
153 return underlying_writer->get_max_message_size();
160 while (!pending.empty())
162 const auto& next = pending.front();
171 const auto marker = underlying_writer->prepare(
172 next.m, next.buffer.size(),
false,
nullptr);
174 if (!marker.has_value())
180 underlying_writer->write_bytes(
181 marker, next.buffer.data(), next.buffer.size());
182 underlying_writer->finish(marker);
188 return pending.empty();
197 using WriterSet = std::vector<std::weak_ptr<ringbuffer::NonBlockingWriter>>;
199 WriterSet writers_to_outside;
200 WriterSet writers_to_inside;
202 std::shared_ptr<ringbuffer::NonBlockingWriter> add_writer(
203 const std::shared_ptr<ringbuffer::AbstractWriter>& underlying,
206 auto new_writer = std::make_shared<NonBlockingWriter>(underlying);
207 writers.emplace_back(new_writer);
211 bool flush_all(WriterSet& writers)
213 bool all_empty =
true;
215 auto it = writers.begin();
216 while (it != writers.end())
218 auto shared_ptr = it->lock();
221 all_empty &= shared_ptr->try_flush_pending();
226 it = writers.erase(it);
237 std::shared_ptr<ringbuffer::NonBlockingWriter>
246 return flush_all(writers_to_outside);
249 std::shared_ptr<ringbuffer::NonBlockingWriter>
258 return flush_all(writers_to_inside);
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_inside()=0
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:63
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
Definition non_blocking.h:193
bool flush_all_outbound()
Definition non_blocking.h:244
bool flush_all_inbound()
Definition non_blocking.h:256
NonBlockingWriterFactory(AbstractWriterFactory &impl)
Definition non_blocking.h:234
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition non_blocking.h:267
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition non_blocking.h:261
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_inside()
Definition non_blocking.h:250
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_outside()
Definition non_blocking.h:238
Definition non_blocking.h:24
WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition non_blocking.h:98
NonBlockingWriter(WriterPtr writer)
Definition non_blocking.h:44
WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool, size_t *identifier=nullptr) override
Definition non_blocking.h:47
size_t get_max_message_size() override
Definition non_blocking.h:151
bool try_flush_pending()
Definition non_blocking.h:158
void finish(const WriteMarker &marker) override
Definition non_blocking.h:77
Definition non_blocking.h:15
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
uint32_t Message
Definition ring_buffer_types.h:19