CCF
Loading...
Searching...
No Matches
non_blocking.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ring_buffer.h"
6
7#include <deque>
8#define FMT_HEADER_ONLY
9#include <fmt/format.h>
10#include <memory>
11#include <vector>
12
13namespace ringbuffer
14{
15 // This wraps an underlying Writer implementation and ensure calls to write()
16 // will not block indefinitely. This never calls the blocking write()
17 // implementation. Instead it calls try_write(), and in the case that a write
18 // fails (because the target ringbuffer is full), the message is placed in a
19 // pending queue. These pending message must be flushed regularly, attempting
20 // again to write to the ringbuffer.
21
23 {
24 private:
25 WriterPtr underlying_writer;
26
27 struct PendingMessage
28 {
29 Message m;
30 size_t marker;
31 bool finished;
32 std::vector<uint8_t> buffer;
33
34 PendingMessage(Message m_, std::vector<uint8_t>&& buffer_) :
35 m(m_),
36 marker(0),
37 finished(false),
38 buffer(buffer_)
39 {}
40 };
41
42 std::deque<PendingMessage> pending;
43
44 public:
45 NonBlockingWriter(const WriterPtr& writer) : underlying_writer(writer) {}
46
49 size_t total_size,
50 bool,
51 size_t* identifier = nullptr) override
52 {
53 if (pending.empty())
54 {
55 // No currently pending messages - try to write to underlying buffer
56 const auto marker =
57 underlying_writer->prepare(m, total_size, false, identifier);
58
59 if (marker.has_value())
60 {
61 return marker;
62 }
63
64 // Prepare failed, no space in buffer - so add to queue
65 }
66
67 pending.emplace_back(m, std::vector<uint8_t>(total_size));
68
69 auto& msg = pending.back();
70 msg.marker = (size_t)msg.buffer.data();
71
72 // NB: There is an assumption that these markers will never conflict with
73 // the markers produced by the underlying writer impl
74 return msg.marker;
75 }
76
77 virtual void finish(const WriteMarker& marker) override
78 {
79 if (marker.has_value())
80 {
81 for (auto& it : pending)
82 {
83 // NB: finish is passed the _initial_ WriteMarker, so we compare
84 // against it.buffer.data() rather than it.marker
85 if ((size_t)it.buffer.data() == marker.value())
86 {
87 // This is a pending write. Mark as completed, so we can later flush
88 // it
89 it.finished = true;
90 return;
91 }
92 }
93 }
94
95 underlying_writer->finish(marker);
96 }
97
99 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
100 {
101 if (marker.has_value())
102 {
103 for (auto& it : pending)
104 {
105 if (it.finished)
106 {
107 continue;
108 }
109
110 if (it.marker == marker.value())
111 {
112 // This is a pending write - dump data directly to write marker,
113 // which should be within the appropriate buffer
114 auto dest = (uint8_t*)marker.value();
115 if (dest < it.buffer.data())
116 {
117 throw std::runtime_error(fmt::format(
118 "Invalid pending marker - writing before buffer: {} < {}",
119 (size_t)dest,
120 (size_t)it.buffer.data()));
121 }
122
123 const auto buffer_end = it.buffer.data() + it.buffer.size();
124 if (dest + size > buffer_end)
125 {
126 throw std::runtime_error(fmt::format(
127 "Invalid pending marker - write extends beyond buffer: {} + {} "
128 "> {}",
129 (size_t)dest,
130 (size_t)size,
131 (size_t)buffer_end));
132 }
133
134 if (size != 0)
135 {
136 std::memcpy(dest, bytes, size);
137 }
138
139 dest += size;
140 it.marker = (size_t)dest;
141 return {it.marker};
142 }
143 }
144 }
145
146 // Otherwise, this was successfully prepared on the underlying
147 // implementation - delegate to it for remaining writes
148 return underlying_writer->write_bytes(marker, bytes, size);
149 }
150
151 size_t get_max_message_size() override
152 {
153 return underlying_writer->get_max_message_size();
154 }
155
156 // Returns true if flush completed and there are no more pending messages.
157 // False means 0 or more pending messages were written, but some remain
159 {
160 while (!pending.empty())
161 {
162 const auto& next = pending.front();
163 if (!next.finished)
164 {
165 // If we reached an in-progress pending message, stop - we can't flush
166 // this or anything after it
167 break;
168 }
169
170 // Try to write this pending message to the underlying writer
171 const auto marker = underlying_writer->prepare(
172 next.m, next.buffer.size(), false, nullptr);
173
174 if (!marker.has_value())
175 {
176 // No space - stop flushing
177 break;
178 }
179
180 underlying_writer->write_bytes(
181 marker, next.buffer.data(), next.buffer.size());
182 underlying_writer->finish(marker);
183
184 // This pending message was successfully written - pop it and continue
185 pending.pop_front();
186 }
187
188 return pending.empty();
189 }
190 };
191
193 {
194 AbstractWriterFactory& factory_impl;
195
196 // Could be set, but needs custom hash() + operator<, so vector is simpler
197 using WriterSet = std::vector<std::weak_ptr<ringbuffer::NonBlockingWriter>>;
198
199 WriterSet writers_to_outside;
200 WriterSet writers_to_inside;
201
202 std::shared_ptr<ringbuffer::NonBlockingWriter> add_writer(
203 const std::shared_ptr<ringbuffer::AbstractWriter>& underlying,
204 WriterSet& writers)
205 {
206 auto new_writer = std::make_shared<NonBlockingWriter>(underlying);
207 writers.emplace_back(new_writer);
208 return new_writer;
209 }
210
211 bool flush_all(WriterSet& writers)
212 {
213 bool all_empty = true;
214
215 auto it = writers.begin();
216 while (it != writers.end())
217 {
218 auto shared_ptr = it->lock();
219 if (shared_ptr)
220 {
221 all_empty &= shared_ptr->try_flush_pending();
222 ++it;
223 }
224 else
225 {
226 it = writers.erase(it);
227 }
228 }
229
230 return all_empty;
231 }
232
233 public:
235 {}
236
237 std::shared_ptr<ringbuffer::NonBlockingWriter>
239 {
240 return add_writer(
241 factory_impl.create_writer_to_outside(), writers_to_outside);
242 }
243
245 {
246 return flush_all(writers_to_outside);
247 }
248
249 std::shared_ptr<ringbuffer::NonBlockingWriter>
251 {
252 return add_writer(
253 factory_impl.create_writer_to_inside(), writers_to_inside);
254 }
255
257 {
258 return flush_all(writers_to_inside);
259 }
260
261 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
262 override
263 {
265 }
266
267 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
268 override
269 {
271 }
272 };
273}
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_inside()=0
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition non_blocking.h:193
bool flush_all_outbound()
Definition non_blocking.h:244
bool flush_all_inbound()
Definition non_blocking.h:256
NonBlockingWriterFactory(AbstractWriterFactory &impl)
Definition non_blocking.h:234
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition non_blocking.h:267
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition non_blocking.h:261
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_inside()
Definition non_blocking.h:250
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_outside()
Definition non_blocking.h:238
Definition non_blocking.h:23
virtual void finish(const WriteMarker &marker) override
Definition non_blocking.h:77
NonBlockingWriter(const WriterPtr &writer)
Definition non_blocking.h:45
size_t get_max_message_size() override
Definition non_blocking.h:151
bool try_flush_pending()
Definition non_blocking.h:158
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool, size_t *identifier=nullptr) override
Definition non_blocking.h:47
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition non_blocking.h:98
Definition non_blocking.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19