CCF
Loading...
Searching...
No Matches
snapshot_manager.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
9
10#include <charconv>
11#include <filesystem>
12#include <fstream>
13#include <iostream>
14#include <optional>
15
16namespace fs = std::filesystem;
17
18namespace snapshots
19{
21 {
22 private:
23 ringbuffer::WriterPtr to_enclave;
24
25 const fs::path snapshot_dir;
26 const std::optional<fs::path> read_snapshot_dir = std::nullopt;
27
28 struct PendingSnapshot
29 {
30 ::consensus::Index evidence_idx;
31 std::shared_ptr<std::vector<uint8_t>> snapshot;
32 };
33 std::map<size_t, PendingSnapshot> pending_snapshots;
34
35 public:
37 const std::string& snapshot_dir_,
39 const std::optional<std::string>& read_snapshot_dir_ = std::nullopt) :
40 to_enclave(writer_factory.create_writer_to_inside()),
41 snapshot_dir(snapshot_dir_),
42 read_snapshot_dir(read_snapshot_dir_)
43 {
44 if (fs::is_directory(snapshot_dir))
45 {
47 "Snapshots will be stored in existing directory: {}", snapshot_dir);
48 }
49 else if (!fs::create_directory(snapshot_dir))
50 {
51 throw std::logic_error(
52 fmt::format("Could not create snapshot directory: {}", snapshot_dir));
53 }
54
55 if (
56 read_snapshot_dir.has_value() &&
57 !fs::is_directory(read_snapshot_dir.value()))
58 {
59 throw std::logic_error(fmt::format(
60 "{} read-only snapshot is not a directory",
61 read_snapshot_dir.value()));
62 }
63 }
64
65 fs::path get_main_directory() const
66 {
67 return snapshot_dir;
68 }
69
70 std::shared_ptr<std::vector<uint8_t>> add_pending_snapshot(
72 ::consensus::Index evidence_idx,
73 size_t requested_size)
74 {
75 auto snapshot = std::make_shared<std::vector<uint8_t>>(requested_size);
76 pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot});
77
79 "Added pending snapshot {} [{} bytes]", idx, requested_size);
80
81 return snapshot;
82 }
83
84#define THROW_ON_ERROR(x, name) \
85 do \
86 { \
87 auto rc = x; \
88 if (rc == -1) \
89 { \
90 throw std::runtime_error(fmt::format( \
91 "Error ({}) writing snapshot {} in " #x, strerror(errno), name)); \
92 } \
93 } while (0)
94
96 {
97 // Inputs, populated at construction
98 const std::filesystem::path dir;
99 const std::string tmp_file_name;
100 const int snapshot_fd;
101
102 // Outputs, populated by callback
103 std::string committed_file_name = {};
104 };
105
106 static void on_snapshot_sync_and_rename(uv_work_t* req)
107 {
108 auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
109
110 {
111 asynchost::TimeBoundLogger log_if_slow(
112 fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
113 fsync(data->snapshot_fd);
114 }
115
116 close(data->snapshot_fd);
117
118 // e.g. snapshot_100_105.committed
119 data->committed_file_name =
120 fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
121 const auto full_committed_path = data->dir / data->committed_file_name;
122
123 const auto full_tmp_path = data->dir / data->tmp_file_name;
124 files::rename(full_tmp_path, full_committed_path);
125 }
126
127 static void on_snapshot_sync_and_rename_complete(uv_work_t* req, int status)
128 {
129 auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
130
132 "Renamed temporary snapshot {} to {}",
133 data->tmp_file_name,
134 data->committed_file_name);
135
136 delete data;
137 delete req;
138 }
139
141 ::consensus::Index snapshot_idx,
142 const uint8_t* receipt_data,
143 size_t receipt_size)
144 {
145 asynchost::TimeBoundLogger log_if_slow(
146 fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx));
147
148 try
149 {
150 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();
151 it++)
152 {
153 if (snapshot_idx == it->first)
154 {
155 // e.g. snapshot_100_105
156 auto file_name = fmt::format(
157 "{}{}{}{}{}",
158 snapshot_file_prefix,
159 snapshot_idx_delimiter,
160 it->first,
161 snapshot_idx_delimiter,
162 it->second.evidence_idx);
163 auto full_snapshot_path = snapshot_dir / file_name;
164
165 int snapshot_fd = open(
166 full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
167 if (snapshot_fd == -1)
168 {
169 if (errno == EEXIST)
170 {
171 // In the case that a file with this name already exists, keep
172 // existing file and drop pending snapshot
174 "Cannot write snapshot as file already exists: {}",
175 file_name);
176 }
177 else
178 {
180 "Cannot write snapshot: error ({}) opening file {}",
181 errno,
182 file_name);
183 }
184 }
185 else
186 {
187 const auto& snapshot = it->second.snapshot;
188
190 write(snapshot_fd, snapshot->data(), snapshot->size()),
191 file_name);
193 write(snapshot_fd, receipt_data, receipt_size), file_name);
194
196 "New snapshot file written to {} [{} bytes] (unsynced)",
197 file_name,
198 snapshot->size() + receipt_size);
199
200 // Call fsync and rename on a worker-thread via uv async, as they
201 // may be slow
202 uv_work_t* work_handle = new uv_work_t;
203
204 {
205 auto* data = new AsyncSnapshotSyncAndRename{
206 .dir = snapshot_dir,
207 .tmp_file_name = file_name,
208 .snapshot_fd = snapshot_fd};
209
210 work_handle->data = data;
211 }
212
213#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
214 on_snapshot_sync_and_rename(work_handle);
216#else
217 uv_queue_work(
218 uv_default_loop(),
219 work_handle,
222#endif
223 }
224
225 pending_snapshots.erase(it);
226
227 return;
228 }
229 }
230
231 LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx);
232 }
233 catch (std::exception& e)
234 {
236 "Exception while attempting to commit snapshot at {}: {}",
237 snapshot_idx,
238 e.what());
239 }
240 }
241#undef THROW_ON_ERROR
242
243 std::optional<std::pair<fs::path, fs::path>>
245 {
246 // Keep track of latest snapshot file in both directories
247 size_t latest_idx = 0;
248
249 std::optional<fs::path> read_only_latest_committed_snapshot =
250 std::nullopt;
251 if (read_snapshot_dir.has_value())
252 {
253 read_only_latest_committed_snapshot =
255 read_snapshot_dir.value(), latest_idx);
256 }
257
258 auto main_latest_committed_snapshot =
259 find_latest_committed_snapshot_in_directory(snapshot_dir, latest_idx);
260
261 if (main_latest_committed_snapshot.has_value())
262 {
263 return std::make_pair(
264 snapshot_dir, main_latest_committed_snapshot.value());
265 }
266 else if (read_only_latest_committed_snapshot.has_value())
267 {
268 return std::make_pair(
269 read_snapshot_dir.value(),
270 read_only_latest_committed_snapshot.value());
271 }
272
273 return std::nullopt;
274 }
275
278 {
280 disp,
281 ::consensus::snapshot_allocate,
282 [this](const uint8_t* data, size_t size) {
283 auto idx = serialized::read<::consensus::Index>(data, size);
284 auto evidence_idx = serialized::read<::consensus::Index>(data, size);
285 auto requested_size = serialized::read<size_t>(data, size);
286 auto generation_count = serialized::read<uint32_t>(data, size);
287
288 auto snapshot =
289 add_pending_snapshot(idx, evidence_idx, requested_size);
290
292 ::consensus::snapshot_allocated,
293 to_enclave,
294 std::span<uint8_t>{snapshot->data(), snapshot->size()},
295 generation_count);
296 });
297
299 disp,
300 ::consensus::snapshot_commit,
301 [this](const uint8_t* data, size_t size) {
302 auto snapshot_idx = serialized::read<::consensus::Index>(data, size);
303 commit_snapshot(snapshot_idx, data, size);
304 });
305 }
306 };
307}
Definition messaging.h:38
Definition ring_buffer_types.h:153
Definition snapshot_manager.h:21
fs::path get_main_directory() const
Definition snapshot_manager.h:65
SnapshotManager(const std::string &snapshot_dir_, ringbuffer::AbstractWriterFactory &writer_factory, const std::optional< std::string > &read_snapshot_dir_=std::nullopt)
Definition snapshot_manager.h:36
static void on_snapshot_sync_and_rename_complete(uv_work_t *req, int status)
Definition snapshot_manager.h:127
void commit_snapshot(::consensus::Index snapshot_idx, const uint8_t *receipt_data, size_t receipt_size)
Definition snapshot_manager.h:140
static void on_snapshot_sync_and_rename(uv_work_t *req)
Definition snapshot_manager.h:106
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition snapshot_manager.h:276
std::optional< std::pair< fs::path, fs::path > > find_latest_committed_snapshot()
Definition snapshot_manager.h:244
std::shared_ptr< std::vector< uint8_t > > add_pending_snapshot(::consensus::Index idx, ::consensus::Index evidence_idx, size_t requested_size)
Definition snapshot_manager.h:70
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
uint64_t Index
Definition ledger_enclave_types.h:11
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
Definition fetch.h:35
std::optional< fs::path > find_latest_committed_snapshot_in_directory(const fs::path &directory, size_t &latest_committed_snapshot_idx)
Definition filenames.h:135
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
#define THROW_ON_ERROR(x, name)
Definition snapshot_manager.h:84
Definition time_bound_logger.h:14
const std::filesystem::path dir
Definition snapshot_manager.h:98
const int snapshot_fd
Definition snapshot_manager.h:100
const std::string tmp_file_name
Definition snapshot_manager.h:99
std::string committed_file_name
Definition snapshot_manager.h:103