CCF
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/mem.h"
6#include "ring_buffer_types.h"
7
8#include <atomic>
9#include <cstring>
10#include <functional>
11#include <thread>
12
13// This file implements a Multiple-Producer Single-Consumer ringbuffer.
14
15// A single Reader instance owns an underlying memory buffer, and a single
16// thread should process message written to it. Any number of other threads and
17// Writers may write to it, and the messages will be distinct, correct, and
18// ordered.
19
20// A Circuit wraps a pair of ringbuffers to allow 2-way communication - messages
21// are written to the inbound buffer, processed inside an enclave, and responses
22// written back to the outbound.
23
24namespace ringbuffer
25{
26 using Handler = std::function<void(Message, const uint8_t*, size_t)>;
27
28 // High bit of message size is used to indicate a pending message
29 static constexpr uint32_t pending_write_flag = 1 << 31;
30 static constexpr uint32_t length_mask = ~pending_write_flag;
31
32 struct Const
33 {
34 enum : Message
35 {
36 msg_max = std::numeric_limits<Message>::max() - 1,
39 msg_pad = std::numeric_limits<Message>::max()
40 };
41
42 static constexpr bool is_power_of_2(size_t n)
43 {
44 return n && ((n & (~n + 1)) == n);
45 }
46
47 static bool is_aligned(uint8_t const* data, size_t align)
48 {
49 return reinterpret_cast<std::uintptr_t>(data) % align == 0;
50 }
51
52 static constexpr size_t header_size()
53 {
54 // The header is a 32 bit length and a 32 bit message ID.
55 return sizeof(int32_t) + sizeof(uint32_t);
56 }
57
58 static constexpr size_t align_size(size_t n)
59 {
60 // Make sure the header is aligned in memory.
61 return (n + (header_size() - 1)) & ~(header_size() - 1);
62 }
63
64 static constexpr size_t entry_size(size_t n)
65 {
66 return Const::align_size(n + header_size());
67 }
68
69 static constexpr size_t max_size()
70 {
71 // The length of a message plus its header must be encodable in the
72 // header. High bit of lengths indicate pending writes.
73 return std::numeric_limits<int32_t>::max() - header_size();
74 }
75
76 static constexpr size_t max_reservation_size(size_t buffer_size)
77 {
78 // This guarantees that in an empty buffer, we can always make this
79 // reservation in a single contiguous region (either before or after the
80 // current tail). If we allow larger reservations then we may need to
81 // artificially advance the tail (writing padding then clearing it) to
82 // create a sufficiently large region.
83 return buffer_size / 2;
84 }
85
86 static constexpr size_t previous_power_of_2(size_t n)
87 {
88 const auto lz = __builtin_clzll(n);
89 return 1ul << (sizeof(size_t) * 8 - 1 - lz);
90 }
91
92 static bool find_acceptable_sub_buffer(uint8_t*& data_, size_t& size_)
93 {
94 void* data = reinterpret_cast<void*>(data_);
95 size_t size = size_;
96
97 auto ret = std::align(8, sizeof(size_t), data, size);
98 if (ret == nullptr)
99 {
100 return false;
101 }
102
103 data_ = reinterpret_cast<uint8_t*>(data);
104 size_ = previous_power_of_2(size);
105 return true;
106 }
107
108 static uint64_t make_header(Message m, size_t size, bool pending = true)
109 {
110 return (((uint64_t)m) << 32) |
111 ((size & length_mask) | (pending ? pending_write_flag : 0u));
112 }
113 };
114
116 {
117 uint8_t* data;
118 size_t size;
119
121
122 void check_access(size_t index, size_t access_size)
123 {
124 if (index + access_size > size)
125 {
126#ifdef RINGBUFFER_USE_ABORT
127 abort();
128#else
129 throw std::runtime_error(fmt::format(
130 "Ringbuffer access out of bounds - attempting to access {}, max "
131 "index is {}",
132 index + access_size,
133 size));
134#endif
135 }
136 }
137 };
138
139 namespace
140 {
141 static inline uint64_t read64_impl(const BufferDef& bd, size_t index)
142 {
143 auto src = bd.data + index;
144 auto src_64 = reinterpret_cast<uint64_t*>(src);
145
146#ifdef __cpp_lib_atomic_ref
147 if (Const::is_aligned(src, 8))
148 {
149 auto& ref = *src_64;
150 std::atomic_ref<uint64_t> slot(ref);
151 return slot.load(std::memory_order_acquire);
152 }
153#endif
154
155 // __atomic_load is used instead of std::atomic_ref when std::atomic_ref
156 // is unavailable, or the src pointer is not aligned
157 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
158 uint64_t r = 0;
159 __atomic_load(src_64, &r, __ATOMIC_ACQUIRE);
160 return r;
161 }
162
163 static inline Message message(uint64_t header)
164 {
165 return (Message)(header >> 32);
166 }
167
168 static inline uint32_t length(uint64_t header)
169 {
170 return header & std::numeric_limits<uint32_t>::max();
171 }
172 }
173
174 class Reader
175 {
176 friend class Writer;
177
178 BufferDef bd;
179
180 virtual uint64_t read64(size_t index)
181 {
182 bd.check_access(index, sizeof(uint64_t));
183 return read64_impl(bd, index);
184 }
185
186 virtual void clear_mem(size_t index, size_t advance)
187 {
188 ::memset(bd.data + index, 0, advance);
189 }
190
191 public:
192 Reader(const BufferDef& bd_) : bd(bd_)
193 {
194 if (!Const::is_power_of_2(bd.size))
195 {
196 throw std::logic_error(
197 fmt::format("Buffer size must be a power of 2, not {}", bd.size));
198 }
199
200 if (!Const::is_aligned(bd.data, 8))
201 {
202 throw std::logic_error("Buffer must be 8-byte aligned");
203 }
204 }
205
206 size_t read(size_t limit, Handler f)
207 {
208 auto mask = bd.size - 1;
209 auto hd = bd.offsets->head.load(std::memory_order_acquire);
210 auto hd_index = hd & mask;
211 auto block = bd.size - hd_index;
212 size_t advance = 0;
213 size_t count = 0;
214
215 while ((advance < block) && (count < limit))
216 {
217 auto msg_index = hd_index + advance;
218 auto header = read64(msg_index);
219 auto size = length(header);
220
221 // If we see a pending write, we're done.
222 if ((size & pending_write_flag) != 0u)
223 break;
224
225 auto m = message(header);
226
227 if (m == Const::msg_none)
228 {
229 // There is no message here, we're done.
230 break;
231 }
232 else if (m == Const::msg_pad)
233 {
234 // If we see padding, skip it.
235 // NB: Padding messages are potentially unaligned, where other
236 // messages are aligned by calls to entry_size(). Even for an empty
237 // padding message (size == 0), we need to skip past the message
238 // header.
239 advance += Const::header_size() + size;
240 continue;
241 }
242
243 advance += Const::entry_size(size);
244 ++count;
245
246 // Call the handler function for this message.
247 bd.check_access(hd_index, advance);
248
249 f(m, bd.data + msg_index + Const::header_size(), (size_t)size);
250 }
251
252 if (advance > 0)
253 {
254 // Zero the buffer and advance the head.
255 bd.check_access(hd_index, advance);
256 clear_mem(hd_index, advance);
257 bd.offsets->head.store(hd + advance, std::memory_order_release);
258 }
259
260 return count;
261 }
262 };
263
264 class Writer : public AbstractWriter
265 {
266 protected:
267 BufferDef bd; // copy of reader's buffer definition
268 const size_t rmax;
269
271 {
272 // Index within buffer of reservation start
273 size_t index;
274
275 // Individual identifier for this reservation. Should be unique across
276 // buffer lifetime, amongst all writers
278 };
279
280 public:
281 Writer(const Reader& r) :
282 bd(r.bd),
283 rmax(Const::max_reservation_size(bd.size))
284 {}
285
286 Writer(const Writer& that) : bd(that.bd), rmax(that.rmax) {}
287
288 virtual ~Writer() {}
289
290 virtual std::optional<size_t> prepare(
291 Message m,
292 size_t size,
293 bool wait = true,
294 size_t* identifier = nullptr) override
295 {
296 // Make sure we aren't using a reserved message.
297 if ((m < Const::msg_min) || (m > Const::msg_max))
298 {
299 throw message_error(
300 m, fmt::format("Cannot use a reserved message ({})", m));
301 }
302
303 // Make sure the message fits.
304 if (size > Const::max_size())
305 {
306 throw message_error(
307 m,
308 fmt::format(
309 "Message ({}) is too long for any writer: {} > {}",
310 m,
311 size,
312 Const::max_size()));
313 }
314
315 auto rsize = Const::entry_size(size);
316 if (rsize > rmax)
317 {
318 throw message_error(
319 m,
320 fmt::format(
321 "Message ({}) is too long for this writer: {} > {}",
322 m,
323 rsize,
324 rmax));
325 }
326
327 auto r = reserve(rsize);
328
329 if (!r.has_value())
330 {
331 if (wait)
332 {
333 // Retry until there is sufficient space.
334 do
335 {
336 std::this_thread::yield();
337 r = reserve(rsize);
338 } while (!r.has_value());
339 }
340 else
341 {
342 // Fail if there is insufficient space.
343 return {};
344 }
345 }
346
347 // Write the preliminary header and return the buffer pointer.
348 // The initial header length has high bit set to indicate a pending
349 // message. We rewrite the real length after the message data.
350 write64(r.value().index, Const::make_header(m, size));
351
352 if (identifier != nullptr)
353 *identifier = r.value().identifier;
354
355 return {r.value().index + Const::header_size()};
356 }
357
358 virtual void finish(const WriteMarker& marker) override
359 {
360 if (marker.has_value())
361 {
362 // Fix up the size to indicate we're done writing - unset pending bit.
363 const auto index = marker.value() - Const::header_size();
364 const auto header = read64(index);
365 const auto size = length(header);
366 const auto m = message(header);
367 const auto finished_header = Const::make_header(m, size, false);
368 write64(index, finished_header);
369 }
370 }
371
372 virtual size_t get_max_message_size() override
373 {
374 return Const::max_size();
375 }
376
377 protected:
379 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
380 {
381 if (!marker.has_value())
382 {
383 return {};
384 }
385
386 const auto index = marker.value();
387
388 bd.check_access(index, size);
389
390 // Standard says memcpy(x, null, 0) is undefined, so avoid it
391 if (size > 0)
392 {
393 ccf::pal::safe_memcpy(bd.data + index, bytes, size);
394 }
395
396 return {index + size};
397 }
398
399 private:
400 // We use this to detect whether the head is ahead of the tail. In real
401 // operation they should be close to each, relative to the total range of a
402 // uint64_t. To handle wrap-around (ie - when a write has overflowed past
403 // the max value), we consider it larger if the distance between a and b is
404 // less than half the total range (and positive).
405 static bool greater_with_wraparound(size_t a, size_t b)
406 {
407 static constexpr auto switch_point = UINT64_MAX / 2;
408
409 return (a != b) && ((a - b) < switch_point);
410 }
411
412 virtual uint64_t read64(size_t index)
413 {
414 bd.check_access(index, sizeof(uint64_t));
415 return read64_impl(bd, index);
416 }
417
418 virtual void write64(size_t index, uint64_t value)
419 {
420 bd.check_access(index, sizeof(value));
421#ifdef __cpp_lib_atomic_ref
422 auto& ref = *(reinterpret_cast<uint64_t*>(bd.data + index));
423 std::atomic_ref<uint64_t> slot(ref);
424 slot.store(value, std::memory_order_release);
425#else
426 // __atomic_store is used instead of std::atomic_ref since it's not
427 // supported by libc++ yet.
428 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
429 __atomic_store(
430 reinterpret_cast<uint64_t*>(bd.data + index), &value, __ATOMIC_RELEASE);
431#endif
432 }
433
434 std::optional<Reservation> reserve(size_t size)
435 {
436 auto mask = bd.size - 1;
437 auto hd = bd.offsets->head_cache.load(std::memory_order_acquire);
438 auto tl = bd.offsets->tail.load(std::memory_order_relaxed);
439
440 // NB: These will be always be set on the first loop, before they are
441 // read, so this initialisation is unnecessary. It is added to placate
442 // static analyzers.
443 size_t padding = 0u;
444 size_t tl_index = 0u;
445
446 do
447 {
448 auto gap = tl - hd;
449 auto avail = bd.size - gap;
450
451 // If the head cache is too far behind the tail, or if the message does
452 // not fit in the available space, get an accurate head and try again.
453 if ((gap > bd.size) || (size > avail))
454 {
455 // If the message does not fit in the sum of front-space and
456 // back-space, see if head has moved to give us enough space.
457 hd = bd.offsets->head.load(std::memory_order_acquire);
458
459 // This happens if the head has passed the tail we previously loaded.
460 // It is safe to continue here, as the compare_exchange_weak is
461 // guaranteed to fail and update tl.
462 if (greater_with_wraparound(hd, tl))
463 {
464 continue;
465 }
466
467 avail = bd.size - (tl - hd);
468
469 // If it still doesn't fit, fail.
470 if (size > avail)
471 return {};
472
473 // This may move the head cache backwards, but if so, that is safe and
474 // will be corrected later.
475 bd.offsets->head_cache.store(hd, std::memory_order_release);
476 }
477
478 padding = 0;
479 tl_index = tl & mask;
480 auto block = bd.size - tl_index;
481
482 if (size > block)
483 {
484 // If the message doesn't fit in back-space...
485 auto hd_index = hd & mask;
486
487 if (size > hd_index)
488 {
489 // If message doesn't fit in front-space, see if the head has moved
490 hd = bd.offsets->head.load(std::memory_order_acquire);
491 hd_index = hd & mask;
492
493 // If it still doesn't fit, fail - there is not a contiguous region
494 // large enough for this reservation
495 if (size > hd_index)
496 return {};
497
498 // This may move the head cache backwards, but if so, that is safe
499 // and will be corrected later.
500 bd.offsets->head_cache.store(hd, std::memory_order_release);
501 }
502
503 // Pad the back-space and reserve front-space for our message in a
504 // single tail update.
505 padding = block;
506 }
507 } while (!bd.offsets->tail.compare_exchange_weak(
508 tl, tl + size + padding, std::memory_order_seq_cst));
509
510 if (padding != 0)
511 {
512 write64(
513 tl_index,
515 Const::msg_pad, padding - Const::header_size(), false));
516 tl_index = 0;
517 }
518
519 return {{tl_index, tl}};
520 }
521 };
522
523 // This is entirely non-virtual so can be safely passed to the enclave
525 {
526 private:
527 ringbuffer::Reader from_outside;
528 ringbuffer::Reader from_inside;
529
530 public:
532 const BufferDef& from_outside_buffer,
533 const BufferDef& from_inside_buffer) :
534 from_outside(from_outside_buffer),
535 from_inside(from_inside_buffer)
536 {}
537
539 {
540 return from_outside;
541 }
542
544 {
545 return from_inside;
546 }
547
549 {
550 return ringbuffer::Writer(from_inside);
551 }
552
554 {
555 return ringbuffer::Writer(from_outside);
556 }
557 };
558
560 {
561 ringbuffer::Circuit& raw_circuit;
562
563 public:
564 WriterFactory(ringbuffer::Circuit& c) : raw_circuit(c) {}
565
566 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
567 override
568 {
569 return std::make_shared<Writer>(raw_circuit.read_from_inside());
570 }
571
572 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
573 override
574 {
575 return std::make_shared<Writer>(raw_circuit.read_from_outside());
576 }
577 };
578
579 // This struct wraps buffer management to simplify testing
581 {
582 std::vector<uint8_t> storage;
584
586
587 TestBuffer(size_t size) : storage(size, 0), offsets()
588 {
589 bd.data = storage.data();
590 bd.size = storage.size();
591 bd.offsets = &offsets;
592 }
593 };
594}
Definition ring_buffer_types.h:153
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition ring_buffer.h:525
ringbuffer::Writer write_to_inside()
Definition ring_buffer.h:553
ringbuffer::Reader & read_from_inside()
Definition ring_buffer.h:543
Circuit(const BufferDef &from_outside_buffer, const BufferDef &from_inside_buffer)
Definition ring_buffer.h:531
ringbuffer::Reader & read_from_outside()
Definition ring_buffer.h:538
ringbuffer::Writer write_to_outside()
Definition ring_buffer.h:548
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:206
Reader(const BufferDef &bd_)
Definition ring_buffer.h:192
Definition ring_buffer.h:560
WriterFactory(ringbuffer::Circuit &c)
Definition ring_buffer.h:564
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition ring_buffer.h:572
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition ring_buffer.h:566
Definition ring_buffer.h:265
virtual std::optional< size_t > prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr) override
Definition ring_buffer.h:290
Writer(const Reader &r)
Definition ring_buffer.h:281
virtual void finish(const WriteMarker &marker) override
Definition ring_buffer.h:358
BufferDef bd
Definition ring_buffer.h:267
virtual size_t get_max_message_size() override
Definition ring_buffer.h:372
const size_t rmax
Definition ring_buffer.h:268
Writer(const Writer &that)
Definition ring_buffer.h:286
virtual ~Writer()
Definition ring_buffer.h:288
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition ring_buffer.h:378
Definition ring_buffer_types.h:49
Definition non_blocking.h:14
std::function< void(Message, const uint8_t *, size_t)> Handler
Definition ring_buffer.h:26
uint32_t Message
Definition ring_buffer_types.h:19
Definition ring_buffer.h:116
void check_access(size_t index, size_t access_size)
Definition ring_buffer.h:122
uint8_t * data
Definition ring_buffer.h:117
Offsets * offsets
Definition ring_buffer.h:120
size_t size
Definition ring_buffer.h:118
Definition ring_buffer.h:33
static constexpr size_t max_size()
Definition ring_buffer.h:69
static constexpr bool is_power_of_2(size_t n)
Definition ring_buffer.h:42
static constexpr size_t align_size(size_t n)
Definition ring_buffer.h:58
static constexpr size_t previous_power_of_2(size_t n)
Definition ring_buffer.h:86
static bool find_acceptable_sub_buffer(uint8_t *&data_, size_t &size_)
Definition ring_buffer.h:92
static bool is_aligned(uint8_t const *data, size_t align)
Definition ring_buffer.h:47
static constexpr size_t entry_size(size_t n)
Definition ring_buffer.h:64
static constexpr size_t max_reservation_size(size_t buffer_size)
Definition ring_buffer.h:76
@ msg_pad
Definition ring_buffer.h:39
@ msg_max
Definition ring_buffer.h:36
@ msg_none
Definition ring_buffer.h:38
@ msg_min
Definition ring_buffer.h:37
static constexpr size_t header_size()
Definition ring_buffer.h:52
static uint64_t make_header(Message m, size_t size, bool pending=true)
Definition ring_buffer.h:108
Definition ring_buffer_types.h:25
std::atomic< size_t > head
Definition ring_buffer_types.h:45
std::atomic< size_t > head_cache
Definition ring_buffer_types.h:31
std::atomic< size_t > tail
Definition ring_buffer_types.h:39
Definition ring_buffer.h:581
std::vector< uint8_t > storage
Definition ring_buffer.h:582
BufferDef bd
Definition ring_buffer.h:585
Offsets offsets
Definition ring_buffer.h:583
TestBuffer(size_t size)
Definition ring_buffer.h:587
Definition ring_buffer.h:271
size_t index
Definition ring_buffer.h:273
size_t identifier
Definition ring_buffer.h:277