32 std::vector<uint8_t> buffer;
34 PendingMessage(
Message m_, std::vector<uint8_t>&& buffer_) :
42 std::deque<PendingMessage> pending;
51 size_t* identifier =
nullptr)
override
57 underlying_writer->prepare(m, total_size,
false, identifier);
59 if (marker.has_value())
67 pending.emplace_back(m, std::vector<uint8_t>(total_size));
69 auto& msg = pending.back();
70 msg.marker = (size_t)msg.buffer.data();
79 if (marker.has_value())
81 for (
auto& it : pending)
85 if ((
size_t)it.buffer.data() == marker.value())
95 underlying_writer->finish(marker);
99 const WriteMarker& marker,
const uint8_t* bytes,
size_t size)
override
101 if (marker.has_value())
103 for (
auto& it : pending)
110 if (it.marker == marker.value())
114 auto dest = (uint8_t*)marker.value();
115 if (dest < it.buffer.data())
117 throw std::runtime_error(fmt::format(
118 "Invalid pending marker - writing before buffer: {} < {}",
120 (
size_t)it.buffer.data()));
123 const auto buffer_end = it.buffer.data() + it.buffer.size();
124 if (dest + size > buffer_end)
126 throw std::runtime_error(fmt::format(
127 "Invalid pending marker - write extends beyond buffer: {} + {} "
131 (
size_t)buffer_end));
136 std::memcpy(dest, bytes, size);
140 it.marker = (size_t)dest;
148 return underlying_writer->write_bytes(marker, bytes, size);
153 return underlying_writer->get_max_message_size();
160 while (!pending.empty())
162 const auto& next = pending.front();
171 const auto marker = underlying_writer->prepare(
172 next.m, next.buffer.size(),
false,
nullptr);
174 if (!marker.has_value())
180 underlying_writer->write_bytes(
181 marker, next.buffer.data(), next.buffer.size());
182 underlying_writer->finish(marker);
188 return pending.empty();
197 using WriterSet = std::vector<std::weak_ptr<ringbuffer::NonBlockingWriter>>;
199 WriterSet writers_to_outside;
200 WriterSet writers_to_inside;
202 std::shared_ptr<ringbuffer::NonBlockingWriter> add_writer(
203 const std::shared_ptr<ringbuffer::AbstractWriter>& underlying,
206 auto new_writer = std::make_shared<NonBlockingWriter>(underlying);
207 writers.emplace_back(new_writer);
211 bool flush_all(WriterSet& writers)
213 bool all_empty =
true;
215 auto it = writers.begin();
216 while (it != writers.end())
218 auto shared_ptr = it->lock();
221 all_empty &= shared_ptr->try_flush_pending();
226 it = writers.erase(it);
237 std::shared_ptr<ringbuffer::NonBlockingWriter>
246 return flush_all(writers_to_outside);
249 std::shared_ptr<ringbuffer::NonBlockingWriter>
258 return flush_all(writers_to_inside);
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_inside()=0
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition non_blocking.h:193
bool flush_all_outbound()
Definition non_blocking.h:244
bool flush_all_inbound()
Definition non_blocking.h:256
NonBlockingWriterFactory(AbstractWriterFactory &impl)
Definition non_blocking.h:234
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition non_blocking.h:267
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition non_blocking.h:261
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_inside()
Definition non_blocking.h:250
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_outside()
Definition non_blocking.h:238
Definition non_blocking.h:23
virtual void finish(const WriteMarker &marker) override
Definition non_blocking.h:77
NonBlockingWriter(const WriterPtr &writer)
Definition non_blocking.h:45
size_t get_max_message_size() override
Definition non_blocking.h:151
bool try_flush_pending()
Definition non_blocking.h:158
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool, size_t *identifier=nullptr) override
Definition non_blocking.h:47
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition non_blocking.h:98
Definition non_blocking.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19