CCF
Loading...
Searching...
No Matches
snapshot_manager.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
9
10#include <charconv>
11#include <filesystem>
12#include <fstream>
13#include <iostream>
14#include <optional>
15
16namespace fs = std::filesystem;
17
18namespace snapshots
19{
21 {
22 private:
23 ringbuffer::WriterPtr to_enclave;
24
25 const fs::path snapshot_dir;
26 const std::optional<fs::path> read_snapshot_dir = std::nullopt;
27
28 struct PendingSnapshot
29 {
30 ::consensus::Index evidence_idx;
31 std::shared_ptr<std::vector<uint8_t>> snapshot;
32 };
33 std::map<size_t, PendingSnapshot> pending_snapshots;
34
35 public:
37 const std::string& snapshot_dir_,
39 const std::optional<std::string>& read_snapshot_dir_ = std::nullopt) :
40 to_enclave(writer_factory.create_writer_to_inside()),
41 snapshot_dir(snapshot_dir_),
42 read_snapshot_dir(read_snapshot_dir_)
43 {
44 if (fs::is_directory(snapshot_dir))
45 {
47 "Snapshots will be stored in existing directory: {}", snapshot_dir);
48 }
49 else if (!fs::create_directory(snapshot_dir))
50 {
51 throw std::logic_error(
52 fmt::format("Could not create snapshot directory: {}", snapshot_dir));
53 }
54
55 if (
56 read_snapshot_dir.has_value() &&
57 !fs::is_directory(read_snapshot_dir.value()))
58 {
59 throw std::logic_error(fmt::format(
60 "{} read-only snapshot is not a directory",
61 read_snapshot_dir.value()));
62 }
63 }
64
65 [[nodiscard]] fs::path get_main_directory() const
66 {
67 return snapshot_dir;
68 }
69
70 std::shared_ptr<std::vector<uint8_t>> add_pending_snapshot(
72 ::consensus::Index evidence_idx,
73 size_t requested_size)
74 {
75 auto snapshot = std::make_shared<std::vector<uint8_t>>(requested_size);
76 pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot});
77
79 "Added pending snapshot {} [{} bytes]", idx, requested_size);
80
81 return snapshot;
82 }
83
84#define THROW_ON_ERROR(x, name) \
85 do \
86 { \
87 auto rc = x; \
88 if (rc == -1) \
89 { \
90 throw std::runtime_error( \
91 fmt::format(/* NOLINTNEXTLINE(concurrency-mt-unsafe) */ \
92 "Error ({}) writing snapshot {} in " #x, \
93 strerror(errno), \
94 name)); \
95 } \
96 } while (0)
97
99 {
100 // Inputs, populated at construction
101 const std::filesystem::path dir;
102 const std::string tmp_file_name;
103 const int snapshot_fd;
104
105 // Outputs, populated by callback
107 };
108
109 static void on_snapshot_sync_and_rename(uv_work_t* req)
110 {
111 auto* data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
112
113 {
114 asynchost::TimeBoundLogger log_if_slow(
115 fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
116 fsync(data->snapshot_fd); // NOLINT(concurrency-mt-unsafe)
117 }
118
119 close(data->snapshot_fd); // NOLINT(concurrency-mt-unsafe)
120
121 // e.g. snapshot_100_105.committed
122 data->committed_file_name =
123 fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
124 const auto full_committed_path = data->dir / data->committed_file_name;
125
126 const auto full_tmp_path = data->dir / data->tmp_file_name;
127 files::rename(full_tmp_path, full_committed_path);
128 }
129
131 uv_work_t* req, int /*status*/)
132 {
133 auto* data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
134
136 "Renamed temporary snapshot {} to {}",
137 data->tmp_file_name,
138 data->committed_file_name);
139
140 delete data; // NOLINT(cppcoreguidelines-owning-memory)
141 delete req; // NOLINT(cppcoreguidelines-owning-memory)
142 }
143
145 ::consensus::Index snapshot_idx,
146 const uint8_t* receipt_data,
147 size_t receipt_size)
148 {
149 asynchost::TimeBoundLogger log_if_slow(
150 fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx));
151
152 try
153 {
154 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();
155 it++)
156 {
157 if (snapshot_idx == it->first)
158 {
159 // e.g. snapshot_100_105
160 auto file_name = fmt::format(
161 "{}{}{}{}{}",
162 snapshot_file_prefix,
163 snapshot_idx_delimiter,
164 it->first,
165 snapshot_idx_delimiter,
166 it->second.evidence_idx);
167 auto full_snapshot_path = snapshot_dir / file_name;
168
169 int snapshot_fd = open(
170 full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
171 if (snapshot_fd == -1)
172 {
173 if (errno == EEXIST)
174 {
175 // In the case that a file with this name already exists, keep
176 // existing file and drop pending snapshot
178 "Cannot write snapshot as file already exists: {}",
179 file_name);
180 }
181 else
182 {
184 "Cannot write snapshot: error ({}) opening file {}",
185 errno,
186 file_name);
187 }
188 }
189 else
190 {
191 const auto& snapshot = it->second.snapshot;
192
193 {
194 asynchost::TimeBoundLogger log_write_if_slow(
195 fmt::format("Writing snapshot to {}", file_name));
196 // NOLINTNEXTLINE(concurrency-mt-unsafe)
198 write(snapshot_fd, snapshot->data(), snapshot->size()),
199 file_name);
200 // NOLINTNEXTLINE(concurrency-mt-unsafe)
202 write(snapshot_fd, receipt_data, receipt_size), file_name);
203 }
204
206 "New snapshot file written to {} [{} bytes] (unsynced)",
207 file_name,
208 snapshot->size() + receipt_size);
209
210 // Call fsync and rename on a worker-thread via uv async, as they
211 // may be slow
212 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
213 auto* work_handle = new uv_work_t;
214
215 {
216 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
217 auto* data = new AsyncSnapshotSyncAndRename{
218 .dir = snapshot_dir,
219 .tmp_file_name = file_name,
220 .snapshot_fd = snapshot_fd,
221 .committed_file_name = {}};
222
223 work_handle->data = data;
224 }
225
226#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
227 on_snapshot_sync_and_rename(work_handle);
229#else
230 uv_queue_work(
231 uv_default_loop(),
232 work_handle,
235#endif
236 }
237
238 pending_snapshots.erase(it);
239
240 return;
241 }
242 }
243
244 LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx);
245 }
246 catch (std::exception& e)
247 {
249 "Exception while attempting to commit snapshot at {}: {}",
250 snapshot_idx,
251 e.what());
252 }
253 }
254#undef THROW_ON_ERROR
255
256 std::optional<std::pair<fs::path, fs::path>>
258 {
259 // Keep track of latest snapshot file in both directories
260 size_t latest_idx = 0;
261
262 std::optional<fs::path> read_only_latest_committed_snapshot =
263 std::nullopt;
264 if (read_snapshot_dir.has_value())
265 {
266 read_only_latest_committed_snapshot =
268 read_snapshot_dir.value(), latest_idx);
269 }
270
271 auto main_latest_committed_snapshot =
272 find_latest_committed_snapshot_in_directory(snapshot_dir, latest_idx);
273
274 if (main_latest_committed_snapshot.has_value())
275 {
276 return std::make_pair(
277 snapshot_dir, main_latest_committed_snapshot.value());
278 }
279
280 if (read_only_latest_committed_snapshot.has_value())
281 {
282 return std::make_pair(
283 read_snapshot_dir.value(),
284 read_only_latest_committed_snapshot.value());
285 }
286
287 return std::nullopt;
288 }
289
292 {
294 disp,
295 ::consensus::snapshot_allocate,
296 [this](const uint8_t* data, size_t size) {
297 auto idx = serialized::read<::consensus::Index>(data, size);
298 auto evidence_idx = serialized::read<::consensus::Index>(data, size);
299 auto requested_size = serialized::read<size_t>(data, size);
300 auto generation_count = serialized::read<uint32_t>(data, size);
301
302 auto snapshot =
303 add_pending_snapshot(idx, evidence_idx, requested_size);
304
306 ::consensus::snapshot_allocated,
307 to_enclave,
308 std::span<uint8_t>{snapshot->data(), snapshot->size()},
309 generation_count);
310 });
311
313 disp,
314 ::consensus::snapshot_commit,
315 [this](const uint8_t* data, size_t size) {
316 auto snapshot_idx = serialized::read<::consensus::Index>(data, size);
317 commit_snapshot(snapshot_idx, data, size);
318 });
319 }
320 };
321}
Definition messaging.h:38
Definition ring_buffer_types.h:157
Definition snapshot_manager.h:21
fs::path get_main_directory() const
Definition snapshot_manager.h:65
SnapshotManager(const std::string &snapshot_dir_, ringbuffer::AbstractWriterFactory &writer_factory, const std::optional< std::string > &read_snapshot_dir_=std::nullopt)
Definition snapshot_manager.h:36
void commit_snapshot(::consensus::Index snapshot_idx, const uint8_t *receipt_data, size_t receipt_size)
Definition snapshot_manager.h:144
static void on_snapshot_sync_and_rename_complete(uv_work_t *req, int)
Definition snapshot_manager.h:130
static void on_snapshot_sync_and_rename(uv_work_t *req)
Definition snapshot_manager.h:109
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition snapshot_manager.h:290
std::optional< std::pair< fs::path, fs::path > > find_latest_committed_snapshot()
Definition snapshot_manager.h:257
std::shared_ptr< std::vector< uint8_t > > add_pending_snapshot(::consensus::Index idx, ::consensus::Index evidence_idx, size_t requested_size)
Definition snapshot_manager.h:70
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
uint64_t Index
Definition ledger_enclave_types.h:11
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
Definition fetch.h:36
std::optional< fs::path > find_latest_committed_snapshot_in_directory(const fs::path &directory, size_t &latest_committed_snapshot_idx)
Definition filenames.h:136
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
#define THROW_ON_ERROR(x, name)
Definition snapshot_manager.h:84
Definition time_bound_logger.h:14
const std::filesystem::path dir
Definition snapshot_manager.h:101
const int snapshot_fd
Definition snapshot_manager.h:103
const std::string tmp_file_name
Definition snapshot_manager.h:102
std::string committed_file_name
Definition snapshot_manager.h:106