CCF
Loading...
Searching...
No Matches
ring_buffer_types.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/hash.h"
6#include "ccf/ds/nonstd.h"
7#include "serializer.h"
8
9#include <atomic>
10#define FMT_HEADER_ONLY
11#include <fmt/format.h>
12#include <optional>
13#include <span>
14#include <string>
15#include <vector>
16
17namespace ringbuffer
18{
19 using Message = uint32_t;
20
21 // Align by cacheline to avoid false sharing
22 static constexpr size_t CACHELINE_SIZE = 64;
23
24 struct alignas(CACHELINE_SIZE) Offsets
25 {
26 // This is a lagging value of head, used only by writers. Its purpose is to
27 // reduce contention on head. While there is sufficient space between the
28 // tail and this cached value, the writers and readers do not interact with
29 // any of the same data. Only when this range is filled do the writers
30 // update this cache with a more recent value of head.
31 std::atomic<size_t> head_cache = {0};
32
33 // This marks the end of the in-use segment. The next ringbuffer message
34 // will be written starting at this point, by a writer advancing this tail
35 // value and thus reserving the space between the previous and current value
36 // for its own message. Many writers may try to access this concurrently, so
37 // the winner is determined by an atomic compare-and-swap (with losers
38 // immediately retrying with the new tail).
39 std::atomic<size_t> tail = {0};
40
41 // This marks the start of the in-use segment. It is written only by the
42 // reader, advancing this value once it has read a message and cleared that
43 // message's memory. It is read by writers, but only to update the
44 // head_cache value which is used for calculations.
45 alignas(CACHELINE_SIZE) std::atomic<size_t> head = {0};
46 };
47
48 class message_error : public std::logic_error
49 {
50 public:
52
53 template <typename... Ts>
54 message_error(Message m, Ts&&... ts) :
55 std::logic_error(std::forward<Ts>(ts)...),
56 ringbuffer_message_type(m)
57 {}
58 };
59
61 {
62 public:
63 virtual ~AbstractWriter() = default;
64
68 template <typename Serializer, typename... Ts>
69 void write_with(Message m, Ts&&... ts)
70 {
71 write_multiple<Serializer>(m, true, std::forward<Ts>(ts)...);
72 }
73
76 template <typename Serializer, typename... Ts>
77 bool try_write_with(Message m, Ts&&... ts)
78 {
79 return write_multiple<Serializer>(m, false, std::forward<Ts>(ts)...);
80 }
81
82 template <typename... Ts>
83 void write(Message m, Ts&&... ts)
84 {
85 write_with<serializer::CommonSerializer>(m, std::forward<Ts>(ts)...);
86 }
87
88 template <typename... Ts>
89 bool try_write(Message m, Ts&&... ts)
90 {
91 return try_write_with<serializer::CommonSerializer>(
92 m, std::forward<Ts>(ts)...);
93 }
94
95 // If a call to prepare or write_bytes fails, this returned value will be
96 // empty. Otherwise it is an opaque marker that the implementation can use
97 // to track progress between writes in the same message.
98 using WriteMarker = std::optional<size_t>;
99
110 Message m,
111 size_t size,
112 bool wait = true,
113 size_t* identifier = nullptr) = 0;
114
115 virtual void finish(const WriteMarker& marker) = 0;
116
118 const WriteMarker& marker, const uint8_t* bytes, size_t size) = 0;
119
120 virtual size_t get_max_message_size() = 0;
122
123 private:
124 template <typename Serializer, typename... Ts>
125 bool write_multiple(Message m, bool wait, Ts&&... ts)
126 {
127 auto sections = Serializer::serialize(std::forward<Ts>(ts)...);
128
129 // Fold section->sizes over the + operator, with initial value 0
130 size_t total_size = std::apply(
131 [](const auto&... section) { return (section->size() + ... + 0); },
132 sections);
133
134 const auto initial_marker = prepare(m, total_size, wait);
135
136 if (!initial_marker.has_value())
137 return false;
138
139 auto next = initial_marker;
140 ccf::nonstd::tuple_for_each(sections, [&](const auto& s) {
141 next = write_bytes(next, s->data(), s->size());
142 });
143
144 finish(initial_marker);
145
146 return next.has_value();
147 }
148 };
149
150 using WriterPtr = std::shared_ptr<AbstractWriter>;
151
153 {
154 public:
155 virtual ~AbstractWriterFactory() = default;
156
159 };
160
162#define DEFINE_RINGBUFFER_MSG_TYPE(NAME) \
163 NAME = ccf::ds::fnv_1a<ringbuffer::Message>(#NAME)
164
165 template <ringbuffer::Message m>
167 {
168 static_assert(
170 "No payload specialization for this Message");
171 };
172
173#define DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(MTYPE) \
174 template <> \
175 struct ringbuffer::MessageSerializers<MTYPE> \
176 : public serializer::EmptySerializer \
177 {};
178
179#define DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(MTYPE, ...) \
180 template <> \
181 struct ringbuffer::MessageSerializers<MTYPE> \
182 : public serializer::PreciseSerializer<__VA_ARGS__> \
183 {};
184
185 // Helper functions to write/read with serializer determined by message
186 template <ringbuffer::Message m, typename WriterPtr, typename... Ts>
187 inline void write_message(const WriterPtr& w, Ts&&... ts)
188 {
189 using S = MessageSerializers<m>;
190
191 w->template write_with<S>(m, std::forward<Ts>(ts)...);
192 }
193
194 template <ringbuffer::Message m, typename WriterPtr, typename... Ts>
195 inline bool try_write_message(const WriterPtr& w, Ts&&... ts)
196 {
197 using S = MessageSerializers<m>;
198
199 return w->template try_write_with<S>(m, std::forward<Ts>(ts)...);
200 }
201
202 template <ringbuffer::Message m>
203 inline auto read_message(const uint8_t*& data, size_t& size)
204 {
205 using S = MessageSerializers<m>;
206
207 return S::deserialize(data, size);
208 }
209
210 template <ringbuffer::Message m>
211 inline auto read_message(std::span<const uint8_t>& span)
212 {
213 using S = MessageSerializers<m>;
214
215 const uint8_t* data = span.data();
216 size_t size = span.size();
217 size_t original_size = size;
218
219 auto ret = S::deserialize(data, size);
220 span = span.subspan(original_size - size);
221 return ret;
222 }
223
224 template <ringbuffer::Message m, typename... Ts>
225 inline void write_message_with_error_wrapper(char const* prefix, Ts&&... ts)
226 {
227 try
228 {
229 write_message<m>(std::forward<Ts>(ts)...);
230 }
231 catch (const ringbuffer::message_error& ex)
232 {
233 throw std::logic_error(fmt::format("[{}] {}", prefix, ex.what()));
234 }
235 }
236
237 template <ringbuffer::Message m, typename... Ts>
239 char const* prefix, Ts&&... ts)
240 {
241 try
242 {
243 return try_write_message<m>(std::forward<Ts>(ts)...);
244 }
245 catch (const ringbuffer::message_error& ex)
246 {
247 throw std::logic_error(fmt::format("[{}] {}", prefix, ex.what()));
248 }
249
250 return false;
251 }
252
255#define RINGBUFFER_WRITE_MESSAGE(MSG, ...) \
256 ringbuffer::write_message_with_error_wrapper<MSG>(#MSG, __VA_ARGS__)
257
258#define RINGBUFFER_TRY_WRITE_MESSAGE(MSG, ...) \
259 ringbuffer::try_write_message_with_error_wrapper<MSG>(#MSG, __VA_ARGS__)
260}
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_inside()=0
virtual ~AbstractWriterFactory()=default
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:61
void write_with(Message m, Ts &&... ts)
Definition ring_buffer_types.h:69
bool try_write_with(Message m, Ts &&... ts)
Definition ring_buffer_types.h:77
virtual void finish(const WriteMarker &marker)=0
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
virtual ~AbstractWriter()=default
virtual WriteMarker prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr)=0
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size)=0
void write(Message m, Ts &&... ts)
Definition ring_buffer_types.h:83
bool try_write(Message m, Ts &&... ts)
Definition ring_buffer_types.h:89
virtual size_t get_max_message_size()=0
Definition ring_buffer_types.h:49
Message ringbuffer_message_type
Definition ring_buffer_types.h:51
message_error(Message m, Ts &&... ts)
Definition ring_buffer_types.h:54
T deserialize(const uint8_t *&data, size_t &size)
Definition map_serializers.h:62
size_t serialize(const T &t, uint8_t *&data, size_t &size)
Definition map_serializers.h:48
Definition non_blocking.h:14
void write_message_with_error_wrapper(char const *prefix, Ts &&... ts)
Definition ring_buffer_types.h:225
void write_message(const WriterPtr &w, Ts &&... ts)
Definition ring_buffer_types.h:187
auto read_message(const uint8_t *&data, size_t &size)
Definition ring_buffer_types.h:203
bool try_write_message(const WriterPtr &w, Ts &&... ts)
Definition ring_buffer_types.h:195
bool try_write_message_with_error_wrapper(char const *prefix, Ts &&... ts)
Definition ring_buffer_types.h:238
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19
STL namespace.
Definition ring_buffer_types.h:167
Definition ring_buffer_types.h:25