CCF
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/mem.h"
6#include "ring_buffer_types.h"
7
8#include <atomic>
9#include <cstring>
10#include <functional>
11#include <thread>
12
13// This file implements a Multiple-Producer Single-Consumer ringbuffer.
14
15// A single Reader instance owns an underlying memory buffer, and a single
16// thread should process message written to it. Any number of other threads and
17// Writers may write to it, and the messages will be distinct, correct, and
18// ordered.
19
20// A Circuit wraps a pair of ringbuffers to allow 2-way communication - messages
21// are written to the inbound buffer, processed inside an enclave, and responses
22// written back to the outbound.
23
24namespace ringbuffer
25{
26 using Handler = std::function<void(Message, const uint8_t*, size_t)>;
27
28 // High bit of message size is used to indicate a pending message
29 static constexpr uint32_t pending_write_flag = 1 << 31;
30 static constexpr uint32_t length_mask = ~pending_write_flag;
31
32 struct Const
33 {
34 enum : Message
35 {
36 msg_max = std::numeric_limits<Message>::max() - 1,
39 msg_pad = std::numeric_limits<Message>::max()
40 };
41
42 static constexpr bool is_power_of_2(size_t n)
43 {
44 return (n != 0u) && ((n & (~n + 1)) == n);
45 }
46
47 static bool is_aligned(uint8_t const* data, size_t align)
48 {
49 return reinterpret_cast<std::uintptr_t>(data) % align == 0;
50 }
51
52 static constexpr size_t header_size()
53 {
54 // The header is a 32 bit length and a 32 bit message ID.
55 return sizeof(int32_t) + sizeof(uint32_t);
56 }
57
58 static constexpr size_t align_size(size_t n)
59 {
60 // Make sure the header is aligned in memory.
61 return (n + (header_size() - 1)) & ~(header_size() - 1);
62 }
63
64 static constexpr size_t entry_size(size_t n)
65 {
66 return Const::align_size(n + header_size());
67 }
68
69 static constexpr size_t max_size()
70 {
71 // The length of a message plus its header must be encodable in the
72 // header. High bit of lengths indicate pending writes.
73 return std::numeric_limits<int32_t>::max() - header_size();
74 }
75
76 static constexpr size_t max_reservation_size(size_t buffer_size)
77 {
78 // This guarantees that in an empty buffer, we can always make this
79 // reservation in a single contiguous region (either before or after the
80 // current tail). If we allow larger reservations then we may need to
81 // artificially advance the tail (writing padding then clearing it) to
82 // create a sufficiently large region.
83 return buffer_size / 2;
84 }
85
86 static constexpr size_t previous_power_of_2(size_t n)
87 {
88 const auto lz = __builtin_clzll(n);
89 return 1ul << (sizeof(size_t) * 8 - 1 - lz);
90 }
91
92 static bool find_acceptable_sub_buffer(uint8_t*& data_, size_t& size_)
93 {
94 void* data = reinterpret_cast<void*>(data_);
95 size_t size = size_;
96
97 auto* ret = std::align(8, sizeof(size_t), data, size);
98 if (ret == nullptr)
99 {
100 return false;
101 }
102
103 data_ = reinterpret_cast<uint8_t*>(data);
104 size_ = previous_power_of_2(size);
105 return true;
106 }
107
108 static uint64_t make_header(Message m, size_t size, bool pending = true)
109 {
110 return (((uint64_t)m) << 32) |
111 ((size & length_mask) | (pending ? pending_write_flag : 0u));
112 }
113 };
114
116 {
117 uint8_t* data;
118 size_t size;
119
121
122 void check_access(size_t index, size_t access_size)
123 {
124 if (index + access_size > size)
125 {
126#ifdef RINGBUFFER_USE_ABORT
127 abort();
128#else
129 throw std::runtime_error(fmt::format(
130 "Ringbuffer access out of bounds - attempting to access {}, max "
131 "index is {}",
132 index + access_size,
133 size));
134#endif
135 }
136 }
137 };
138
139 namespace detail
140 {
141 inline uint64_t read64_impl(const BufferDef& bd, size_t index)
142 {
143 auto* src = bd.data + index;
144 auto* src_64 = reinterpret_cast<uint64_t*>(src);
145
146#ifdef __cpp_lib_atomic_ref
147 if (Const::is_aligned(src, 8))
148 {
149 auto& ref = *src_64;
150 std::atomic_ref<uint64_t> slot(ref);
151 return slot.load(std::memory_order_acquire);
152 }
153#endif
154
155 // __atomic_load is used instead of std::atomic_ref when std::atomic_ref
156 // is unavailable, or the src pointer is not aligned
157 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
158 uint64_t r = 0;
159 __atomic_load(src_64, &r, __ATOMIC_ACQUIRE);
160 return r;
161 }
162
163 inline Message message(uint64_t header)
164 {
165 return (Message)(header >> 32);
166 }
167
168 inline uint32_t length(uint64_t header)
169 {
170 return header & std::numeric_limits<uint32_t>::max();
171 }
172 } // namespace detail
173
174 class Reader
175 {
176 friend class Writer;
177
178 BufferDef bd;
179
180 virtual uint64_t read64(size_t index)
181 {
182 bd.check_access(index, sizeof(uint64_t));
183 return detail::read64_impl(bd, index);
184 }
185
186 virtual void clear_mem(size_t index, size_t advance)
187 {
188 ::memset(bd.data + index, 0, advance);
189 }
190
191 public:
192 Reader(const BufferDef& bd_) : bd(bd_)
193 {
194 if (!Const::is_power_of_2(bd.size))
195 {
196 throw std::logic_error(
197 fmt::format("Buffer size must be a power of 2, not {}", bd.size));
198 }
199
200 if (!Const::is_aligned(bd.data, 8))
201 {
202 throw std::logic_error("Buffer must be 8-byte aligned");
203 }
204 }
205
206 virtual ~Reader() = default;
207
208 size_t read(size_t limit, Handler f)
209 {
210 auto mask = bd.size - 1;
211 auto hd = bd.offsets->head.load(std::memory_order_acquire);
212 auto hd_index = hd & mask;
213 auto block = bd.size - hd_index;
214 size_t advance = 0;
215 size_t count = 0;
216
217 while ((advance < block) && (count < limit))
218 {
219 auto msg_index = hd_index + advance;
220 auto header = read64(msg_index);
221 auto size = detail::length(header);
222
223 // If we see a pending write, we're done.
224 if ((size & pending_write_flag) != 0u)
225 {
226 break;
227 }
228
229 auto m = detail::message(header);
230
231 if (m == Const::msg_none)
232 {
233 // There is no message here, we're done.
234 break;
235 }
236
237 if (m == Const::msg_pad)
238 {
239 // If we see padding, skip it.
240 // NB: Padding messages are potentially unaligned, where other
241 // messages are aligned by calls to entry_size(). Even for an empty
242 // padding message (size == 0), we need to skip past the message
243 // header.
244 advance += Const::header_size() + size;
245 continue;
246 }
247
248 advance += Const::entry_size(size);
249 ++count;
250
251 // Call the handler function for this message.
252 bd.check_access(hd_index, advance);
253
254 f(m, bd.data + msg_index + Const::header_size(), (size_t)size);
255 }
256
257 if (advance > 0)
258 {
259 // Zero the buffer and advance the head.
260 bd.check_access(hd_index, advance);
261 clear_mem(hd_index, advance);
262 bd.offsets->head.store(hd + advance, std::memory_order_release);
263 }
264
265 return count;
266 }
267 };
268
269 class Writer : public AbstractWriter
270 {
271 protected:
272 BufferDef bd; // copy of reader's buffer definition
273 const size_t rmax;
274
276 {
277 // Index within buffer of reservation start
278 size_t index;
279
280 // Individual identifier for this reservation. Should be unique across
281 // buffer lifetime, amongst all writers
283 };
284
285 public:
286 Writer(const Reader& r) :
287 bd(r.bd),
288 rmax(Const::max_reservation_size(bd.size))
289 {}
290
291 Writer(const Writer& that) : bd(that.bd), rmax(that.rmax) {}
292
293 ~Writer() override = default;
294
295 std::optional<size_t> prepare(
296 Message m,
297 size_t size,
298 bool wait = true,
299 size_t* identifier = nullptr) override
300 {
301 // Make sure we aren't using a reserved message.
302 if ((m < Const::msg_min) || (m > Const::msg_max))
303 {
304 throw message_error(
305 m, fmt::format("Cannot use a reserved message ({})", m));
306 }
307
308 // Make sure the message fits.
309 if (size > Const::max_size())
310 {
311 throw message_error(
312 m,
313 fmt::format(
314 "Message ({}) is too long for any writer: {} > {}",
315 m,
316 size,
317 Const::max_size()));
318 }
319
320 auto rsize = Const::entry_size(size);
321 if (rsize > rmax)
322 {
323 throw message_error(
324 m,
325 fmt::format(
326 "Message ({}) is too long for this writer: {} > {}",
327 m,
328 rsize,
329 rmax));
330 }
331
332 auto r = reserve(rsize);
333
334 if (!r.has_value())
335 {
336 if (wait)
337 {
338 // Retry until there is sufficient space.
339 do
340 {
341 std::this_thread::yield();
342 r = reserve(rsize);
343 } while (!r.has_value());
344 }
345 else
346 {
347 // Fail if there is insufficient space.
348 return {};
349 }
350 }
351
352 // Write the preliminary header and return the buffer pointer.
353 // The initial header length has high bit set to indicate a pending
354 // message. We rewrite the real length after the message data.
355 write64(r.value().index, Const::make_header(m, size));
356
357 if (identifier != nullptr)
358 {
359 *identifier = r.value().identifier;
360 }
361
362 return {r.value().index + Const::header_size()};
363 }
364
365 void finish(const WriteMarker& marker) override
366 {
367 if (marker.has_value())
368 {
369 // Fix up the size to indicate we're done writing - unset pending bit.
370 const auto index = marker.value() - Const::header_size();
371 const auto header = read64(index);
372 const auto size = detail::length(header);
373 const auto m = detail::message(header);
374 const auto finished_header = Const::make_header(m, size, false);
375 write64(index, finished_header);
376 }
377 }
378
379 size_t get_max_message_size() override
380 {
381 return Const::max_size();
382 }
383
384 protected:
386 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
387 {
388 if (!marker.has_value())
389 {
390 return {};
391 }
392
393 const auto index = marker.value();
394
395 bd.check_access(index, size);
396
397 // Standard says memcpy(x, null, 0) is undefined, so avoid it
398 if (size > 0)
399 {
400 ccf::pal::safe_memcpy(bd.data + index, bytes, size);
401 }
402
403 return {index + size};
404 }
405
406 private:
407 // We use this to detect whether the head is ahead of the tail. In real
408 // operation they should be close to each, relative to the total range of a
409 // uint64_t. To handle wrap-around (ie - when a write has overflowed past
410 // the max value), we consider it larger if the distance between a and b is
411 // less than half the total range (and positive).
412 static bool greater_with_wraparound(size_t a, size_t b)
413 {
414 static constexpr auto switch_point = UINT64_MAX / 2;
415
416 return (a != b) && ((a - b) < switch_point);
417 }
418
419 virtual uint64_t read64(size_t index)
420 {
421 bd.check_access(index, sizeof(uint64_t));
422 return detail::read64_impl(bd, index);
423 }
424
425 virtual void write64(size_t index, uint64_t value)
426 {
427 bd.check_access(index, sizeof(value));
428#ifdef __cpp_lib_atomic_ref
429 auto& ref = *(reinterpret_cast<uint64_t*>(bd.data + index));
430 std::atomic_ref<uint64_t> slot(ref);
431 slot.store(value, std::memory_order_release);
432#else
433 // __atomic_store is used instead of std::atomic_ref since it's not
434 // supported by libc++ yet.
435 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
436 __atomic_store(
437 reinterpret_cast<uint64_t*>(bd.data + index), &value, __ATOMIC_RELEASE);
438#endif
439 }
440
441 std::optional<Reservation> reserve(size_t size)
442 {
443 auto mask = bd.size - 1;
444 auto hd = bd.offsets->head_cache.load(std::memory_order_acquire);
445 auto tl = bd.offsets->tail.load(std::memory_order_relaxed);
446
447 // NB: These will be always be set on the first loop, before they are
448 // read, so this initialisation is unnecessary. It is added to placate
449 // static analyzers.
450 size_t padding = 0u;
451 size_t tl_index = 0u;
452
453 do
454 {
455 auto gap = tl - hd;
456 auto avail = bd.size - gap;
457
458 // If the head cache is too far behind the tail, or if the message does
459 // not fit in the available space, get an accurate head and try again.
460 if ((gap > bd.size) || (size > avail))
461 {
462 // If the message does not fit in the sum of front-space and
463 // back-space, see if head has moved to give us enough space.
464 hd = bd.offsets->head.load(std::memory_order_acquire);
465
466 // This happens if the head has passed the tail we previously loaded.
467 // It is safe to continue here, as the compare_exchange_weak is
468 // guaranteed to fail and update tl.
469 if (greater_with_wraparound(hd, tl))
470 {
471 continue;
472 }
473
474 avail = bd.size - (tl - hd);
475
476 // If it still doesn't fit, fail.
477 if (size > avail)
478 {
479 return {};
480 }
481
482 // This may move the head cache backwards, but if so, that is safe and
483 // will be corrected later.
484 bd.offsets->head_cache.store(hd, std::memory_order_release);
485 }
486
487 padding = 0;
488 tl_index = tl & mask;
489 auto block = bd.size - tl_index;
490
491 if (size > block)
492 {
493 // If the message doesn't fit in back-space...
494 auto hd_index = hd & mask;
495
496 if (size > hd_index)
497 {
498 // If message doesn't fit in front-space, see if the head has moved
499 hd = bd.offsets->head.load(std::memory_order_acquire);
500 hd_index = hd & mask;
501
502 // If it still doesn't fit, fail - there is not a contiguous region
503 // large enough for this reservation
504 if (size > hd_index)
505 {
506 return {};
507 }
508
509 // This may move the head cache backwards, but if so, that is safe
510 // and will be corrected later.
511 bd.offsets->head_cache.store(hd, std::memory_order_release);
512 }
513
514 // Pad the back-space and reserve front-space for our message in a
515 // single tail update.
516 padding = block;
517 }
518 } while (!bd.offsets->tail.compare_exchange_weak(
519 tl, tl + size + padding, std::memory_order_seq_cst));
520
521 if (padding != 0)
522 {
523 write64(
524 tl_index,
526 Const::msg_pad, padding - Const::header_size(), false));
527 tl_index = 0;
528 }
529
530 return {{tl_index, tl}};
531 }
532 };
533
534 // This is entirely non-virtual so can be safely passed to the enclave
536 {
537 private:
538 ringbuffer::Reader from_outside;
539 ringbuffer::Reader from_inside;
540
541 public:
543 const BufferDef& from_outside_buffer,
544 const BufferDef& from_inside_buffer) :
545 from_outside(from_outside_buffer),
546 from_inside(from_inside_buffer)
547 {}
548
550 {
551 return from_outside;
552 }
553
555 {
556 return from_inside;
557 }
558
560 {
561 return {from_inside};
562 }
563
565 {
566 return {from_outside};
567 }
568 };
569
571 {
572 ringbuffer::Circuit& raw_circuit;
573
574 public:
575 WriterFactory(ringbuffer::Circuit& c) : raw_circuit(c) {}
576
577 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
578 override
579 {
580 return std::make_shared<Writer>(raw_circuit.read_from_inside());
581 }
582
583 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
584 override
585 {
586 return std::make_shared<Writer>(raw_circuit.read_from_outside());
587 }
588 };
589
590 // This struct wraps buffer management to simplify testing
592 {
594 std::vector<uint8_t> storage;
596
597 TestBuffer(size_t size) : offsets(), storage(size, 0)
598 {
599 bd.data = storage.data();
600 bd.size = storage.size();
601 bd.offsets = &offsets;
602 }
603 };
604}
Definition ring_buffer_types.h:157
Definition ring_buffer_types.h:63
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
Definition ring_buffer.h:536
ringbuffer::Writer write_to_inside()
Definition ring_buffer.h:564
ringbuffer::Reader & read_from_inside()
Definition ring_buffer.h:554
Circuit(const BufferDef &from_outside_buffer, const BufferDef &from_inside_buffer)
Definition ring_buffer.h:542
ringbuffer::Reader & read_from_outside()
Definition ring_buffer.h:549
ringbuffer::Writer write_to_outside()
Definition ring_buffer.h:559
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:208
virtual ~Reader()=default
Reader(const BufferDef &bd_)
Definition ring_buffer.h:192
Definition ring_buffer.h:571
WriterFactory(ringbuffer::Circuit &c)
Definition ring_buffer.h:575
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition ring_buffer.h:583
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition ring_buffer.h:577
Definition ring_buffer.h:270
std::optional< size_t > prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr) override
Definition ring_buffer.h:295
~Writer() override=default
Writer(const Reader &r)
Definition ring_buffer.h:286
BufferDef bd
Definition ring_buffer.h:272
const size_t rmax
Definition ring_buffer.h:273
Writer(const Writer &that)
Definition ring_buffer.h:291
WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition ring_buffer.h:385
void finish(const WriteMarker &marker) override
Definition ring_buffer.h:365
size_t get_max_message_size() override
Definition ring_buffer.h:379
Definition ring_buffer_types.h:51
uint32_t length(uint64_t header)
Definition ring_buffer.h:168
uint64_t read64_impl(const BufferDef &bd, size_t index)
Definition ring_buffer.h:141
Message message(uint64_t header)
Definition ring_buffer.h:163
Definition non_blocking.h:15
std::function< void(Message, const uint8_t *, size_t)> Handler
Definition ring_buffer.h:26
uint32_t Message
Definition ring_buffer_types.h:19
Definition ring_buffer.h:116
void check_access(size_t index, size_t access_size)
Definition ring_buffer.h:122
uint8_t * data
Definition ring_buffer.h:117
Offsets * offsets
Definition ring_buffer.h:120
size_t size
Definition ring_buffer.h:118
Definition ring_buffer.h:33
@ msg_pad
Definition ring_buffer.h:39
@ msg_max
Definition ring_buffer.h:36
@ msg_none
Definition ring_buffer.h:38
@ msg_min
Definition ring_buffer.h:37
static constexpr size_t max_size()
Definition ring_buffer.h:69
static constexpr bool is_power_of_2(size_t n)
Definition ring_buffer.h:42
static constexpr size_t align_size(size_t n)
Definition ring_buffer.h:58
static constexpr size_t previous_power_of_2(size_t n)
Definition ring_buffer.h:86
static bool find_acceptable_sub_buffer(uint8_t *&data_, size_t &size_)
Definition ring_buffer.h:92
static bool is_aligned(uint8_t const *data, size_t align)
Definition ring_buffer.h:47
static constexpr size_t entry_size(size_t n)
Definition ring_buffer.h:64
static constexpr size_t max_reservation_size(size_t buffer_size)
Definition ring_buffer.h:76
static constexpr size_t header_size()
Definition ring_buffer.h:52
static uint64_t make_header(Message m, size_t size, bool pending=true)
Definition ring_buffer.h:108
Definition ring_buffer_types.h:26
std::atomic< size_t > head
Definition ring_buffer_types.h:31
std::atomic< size_t > head_cache
Definition ring_buffer_types.h:38
std::atomic< size_t > tail
Definition ring_buffer_types.h:46
Definition ring_buffer.h:592
std::vector< uint8_t > storage
Definition ring_buffer.h:594
BufferDef bd
Definition ring_buffer.h:595
Offsets offsets
Definition ring_buffer.h:593
TestBuffer(size_t size)
Definition ring_buffer.h:597
Definition ring_buffer.h:276
size_t index
Definition ring_buffer.h:278
size_t identifier
Definition ring_buffer.h:282