CCF
Loading...
Searching...
No Matches
ring_buffer_types.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/hash.h"
6#include "ccf/ds/nonstd.h"
7#include "serializer.h"
8
9#include <atomic>
10#define FMT_HEADER_ONLY
11#include <fmt/format.h>
12#include <optional>
13#include <span>
14#include <string>
15#include <vector>
16
17namespace ringbuffer
18{
19 using Message = uint32_t;
20
21 // Align by cacheline to avoid false sharing
22 static constexpr size_t CACHELINE_SIZE = 64;
23
24 // NOLINTBEGIN(clang-analyzer-optin.performance.Padding)
25 struct alignas(CACHELINE_SIZE) Offsets
26 {
27 // This marks the start of the in-use segment. It is written only by the
28 // reader, advancing this value once it has read a message and cleared that
29 // message's memory. It is read by writers, but only to update the
30 // head_cache value which is used for calculations.
31 std::atomic<size_t> head = {0};
32
33 // This is a lagging value of head, used only by writers. Its purpose is to
34 // reduce contention on head. While there is sufficient space between the
35 // tail and this cached value, the writers and readers do not interact with
36 // any of the same data. Only when this range is filled do the writers
37 // update this cache with a more recent value of head.
38 alignas(CACHELINE_SIZE) std::atomic<size_t> head_cache = {0};
39
40 // This marks the end of the in-use segment. The next ringbuffer message
41 // will be written starting at this point, by a writer advancing this tail
42 // value and thus reserving the space between the previous and current value
43 // for its own message. Many writers may try to access this concurrently, so
44 // the winner is determined by an atomic compare-and-swap (with losers
45 // immediately retrying with the new tail).
46 alignas(CACHELINE_SIZE) std::atomic<size_t> tail = {0};
47 };
48 // NOLINTEND(clang-analyzer-optin.performance.Padding)
49
50 class message_error : public std::logic_error
51 {
52 public:
54
55 template <typename... Ts>
56 message_error(Message m, Ts&&... ts) :
57 std::logic_error(std::forward<Ts>(ts)...),
58 ringbuffer_message_type(m)
59 {}
60 };
61
63 {
64 public:
65 virtual ~AbstractWriter() = default;
66
70 template <typename Serializer, typename... Ts>
71 void write_with(Message m, Ts&&... ts)
72 {
73 write_multiple<Serializer>(m, true, std::forward<Ts>(ts)...);
74 }
75
78 template <typename Serializer, typename... Ts>
79 bool try_write_with(Message m, Ts&&... ts)
80 {
81 return write_multiple<Serializer>(m, false, std::forward<Ts>(ts)...);
82 }
83
84 template <typename... Ts>
85 void write(Message m, Ts&&... ts)
86 {
87 write_with<serializer::CommonSerializer>(m, std::forward<Ts>(ts)...);
88 }
89
90 template <typename... Ts>
91 bool try_write(Message m, Ts&&... ts)
92 {
93 return try_write_with<serializer::CommonSerializer>(
94 m, std::forward<Ts>(ts)...);
95 }
96
97 // If a call to prepare or write_bytes fails, this returned value will be
98 // empty. Otherwise it is an opaque marker that the implementation can use
99 // to track progress between writes in the same message.
100 using WriteMarker = std::optional<size_t>;
101
112 Message m,
113 size_t size,
114 bool wait = true,
115 size_t* identifier = nullptr) = 0;
116
117 virtual void finish(const WriteMarker& marker) = 0;
118
120 const WriteMarker& marker, const uint8_t* bytes, size_t size) = 0;
121
122 virtual size_t get_max_message_size() = 0;
124
125 private:
126 template <typename Serializer, typename... Ts>
127 bool write_multiple(Message m, bool wait, Ts&&... ts)
128 {
129 auto sections = Serializer::serialize(std::forward<Ts>(ts)...);
130
131 // Fold section->sizes over the + operator, with initial value 0
132 size_t total_size = std::apply(
133 [](const auto&... section) { return (section->size() + ... + 0); },
134 sections);
135
136 const auto initial_marker = prepare(m, total_size, wait);
137
138 if (!initial_marker.has_value())
139 {
140 return false;
141 }
142
143 auto next = initial_marker;
144 ccf::nonstd::tuple_for_each(sections, [&](const auto& s) {
145 next = write_bytes(next, s->data(), s->size());
146 });
147
148 finish(initial_marker);
149
150 return next.has_value();
151 }
152 };
153
154 using WriterPtr = std::shared_ptr<AbstractWriter>;
155
157 {
158 public:
159 virtual ~AbstractWriterFactory() = default;
160
163 };
164
166#define DEFINE_RINGBUFFER_MSG_TYPE(NAME) \
167 NAME = ccf::ds::fnv_1a<ringbuffer::Message>(#NAME)
168
169 template <ringbuffer::Message m>
171 {
172 static_assert(
174 "No payload specialization for this Message");
175 };
176
177#define DECLARE_RINGBUFFER_MESSAGE_NO_PAYLOAD(MTYPE) \
178 template <> \
179 struct ringbuffer::MessageSerializers<MTYPE> \
180 : public serializer::EmptySerializer \
181 {};
182
183#define DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(MTYPE, ...) \
184 template <> \
185 struct ringbuffer::MessageSerializers<MTYPE> \
186 : public serializer::PreciseSerializer<__VA_ARGS__> \
187 {};
188
189 // Helper functions to write/read with serializer determined by message
190 template <ringbuffer::Message m, typename WriterPtr, typename... Ts>
191 inline void write_message(const WriterPtr& w, Ts&&... ts)
192 {
193 using S = MessageSerializers<m>;
194
195 w->template write_with<S>(m, std::forward<Ts>(ts)...);
196 }
197
198 template <ringbuffer::Message m, typename WriterPtr, typename... Ts>
199 inline bool try_write_message(const WriterPtr& w, Ts&&... ts)
200 {
201 using S = MessageSerializers<m>;
202
203 return w->template try_write_with<S>(m, std::forward<Ts>(ts)...);
204 }
205
206 template <ringbuffer::Message m>
207 inline auto read_message(const uint8_t*& data, size_t& size)
208 {
209 using S = MessageSerializers<m>;
210
211 return S::deserialize(data, size);
212 }
213
214 template <ringbuffer::Message m>
215 inline auto read_message(std::span<const uint8_t>& span)
216 {
217 using S = MessageSerializers<m>;
218
219 const uint8_t* data = span.data();
220 size_t size = span.size();
221 size_t original_size = size;
222
223 auto ret = S::deserialize(data, size);
224 span = span.subspan(original_size - size);
225 return ret;
226 }
227
228 template <ringbuffer::Message m, typename... Ts>
229 inline void write_message_with_error_wrapper(char const* prefix, Ts&&... ts)
230 {
231 try
232 {
233 write_message<m>(std::forward<Ts>(ts)...);
234 }
235 catch (const ringbuffer::message_error& ex)
236 {
237 throw std::logic_error(fmt::format("[{}] {}", prefix, ex.what()));
238 }
239 }
240
241 template <ringbuffer::Message m, typename... Ts>
243 char const* prefix, Ts&&... ts)
244 {
245 try
246 {
247 return try_write_message<m>(std::forward<Ts>(ts)...);
248 }
249 catch (const ringbuffer::message_error& ex)
250 {
251 throw std::logic_error(fmt::format("[{}] {}", prefix, ex.what()));
252 }
253
254 return false;
255 }
256
259#define RINGBUFFER_WRITE_MESSAGE(MSG, ...) \
260 ringbuffer::write_message_with_error_wrapper<MSG>(#MSG, __VA_ARGS__)
261
262#define RINGBUFFER_TRY_WRITE_MESSAGE(MSG, ...) \
263 ringbuffer::try_write_message_with_error_wrapper<MSG>(#MSG, __VA_ARGS__)
264}
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_inside()=0
virtual ~AbstractWriterFactory()=default
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:63
void write_with(Message m, Ts &&... ts)
Definition ring_buffer_types.h:71
bool try_write_with(Message m, Ts &&... ts)
Definition ring_buffer_types.h:79
virtual void finish(const WriteMarker &marker)=0
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
virtual ~AbstractWriter()=default
virtual WriteMarker prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr)=0
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size)=0
void write(Message m, Ts &&... ts)
Definition ring_buffer_types.h:85
bool try_write(Message m, Ts &&... ts)
Definition ring_buffer_types.h:91
virtual size_t get_max_message_size()=0
Definition ring_buffer_types.h:51
Message ringbuffer_message_type
Definition ring_buffer_types.h:53
message_error(Message m, Ts &&... ts)
Definition ring_buffer_types.h:56
T deserialize(const uint8_t *&data, size_t &size)
Definition map_serializers.h:62
size_t serialize(const T &t, uint8_t *&data, size_t &size)
Definition map_serializers.h:48
Definition non_blocking.h:15
void write_message_with_error_wrapper(char const *prefix, Ts &&... ts)
Definition ring_buffer_types.h:229
void write_message(const WriterPtr &w, Ts &&... ts)
Definition ring_buffer_types.h:191
auto read_message(const uint8_t *&data, size_t &size)
Definition ring_buffer_types.h:207
bool try_write_message(const WriterPtr &w, Ts &&... ts)
Definition ring_buffer_types.h:199
bool try_write_message_with_error_wrapper(char const *prefix, Ts &&... ts)
Definition ring_buffer_types.h:242
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
uint32_t Message
Definition ring_buffer_types.h:19
STL namespace.
Definition ring_buffer_types.h:171
Definition ring_buffer_types.h:26