16namespace fs = std::filesystem;
25 const fs::path snapshot_dir;
26 const std::optional<fs::path> read_snapshot_dir = std::nullopt;
28 struct PendingSnapshot
31 std::shared_ptr<std::vector<uint8_t>> snapshot;
33 std::map<size_t, PendingSnapshot> pending_snapshots;
37 const std::string& snapshot_dir_,
39 const std::optional<std::string>& read_snapshot_dir_ = std::nullopt) :
40 to_enclave(writer_factory.create_writer_to_inside()),
41 snapshot_dir(snapshot_dir_),
42 read_snapshot_dir(read_snapshot_dir_)
44 if (fs::is_directory(snapshot_dir))
47 "Snapshots will be stored in existing directory: {}", snapshot_dir);
49 else if (!fs::create_directory(snapshot_dir))
51 throw std::logic_error(
52 fmt::format(
"Could not create snapshot directory: {}", snapshot_dir));
56 read_snapshot_dir.has_value() &&
57 !fs::is_directory(read_snapshot_dir.value()))
59 throw std::logic_error(fmt::format(
60 "{} read-only snapshot is not a directory",
61 read_snapshot_dir.value()));
73 size_t requested_size)
75 auto snapshot = std::make_shared<std::vector<uint8_t>>(requested_size);
76 pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot});
79 "Added pending snapshot {} [{} bytes]", idx, requested_size);
84#define THROW_ON_ERROR(x, name) \
90 throw std::runtime_error( \
92 "Error ({}) writing snapshot {} in " #x, \
101 const std::filesystem::path
dir;
115 fmt::format(
"Committing snapshot - fsync({})", data->tmp_file_name));
116 fsync(data->snapshot_fd);
119 close(data->snapshot_fd);
122 data->committed_file_name =
123 fmt::format(
"{}{}", data->tmp_file_name, snapshot_committed_suffix);
124 const auto full_committed_path = data->dir / data->committed_file_name;
126 const auto full_tmp_path = data->dir / data->tmp_file_name;
127 files::rename(full_tmp_path, full_committed_path);
131 uv_work_t* req,
int )
136 "Renamed temporary snapshot {} to {}",
138 data->committed_file_name);
146 const uint8_t* receipt_data,
150 fmt::format(
"Committing snapshot - snapshot_idx={}", snapshot_idx));
154 for (
auto it = pending_snapshots.begin(); it != pending_snapshots.end();
157 if (snapshot_idx == it->first)
160 auto file_name = fmt::format(
162 snapshot_file_prefix,
163 snapshot_idx_delimiter,
165 snapshot_idx_delimiter,
166 it->second.evidence_idx);
167 auto full_snapshot_path = snapshot_dir / file_name;
169 int snapshot_fd = open(
170 full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
171 if (snapshot_fd == -1)
178 "Cannot write snapshot as file already exists: {}",
184 "Cannot write snapshot: error ({}) opening file {}",
191 const auto& snapshot = it->second.snapshot;
195 fmt::format(
"Writing snapshot to {}", file_name));
198 write(snapshot_fd, snapshot->data(), snapshot->size()),
202 write(snapshot_fd, receipt_data, receipt_size), file_name);
206 "New snapshot file written to {} [{} bytes] (unsynced)",
208 snapshot->size() + receipt_size);
213 auto* work_handle =
new uv_work_t;
219 .tmp_file_name = file_name,
220 .snapshot_fd = snapshot_fd,
221 .committed_file_name = {}};
223 work_handle->data = data;
226#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
238 pending_snapshots.erase(it);
244 LOG_FAIL_FMT(
"Could not find snapshot to commit at {}", snapshot_idx);
246 catch (std::exception& e)
249 "Exception while attempting to commit snapshot at {}: {}",
256 std::optional<std::pair<fs::path, fs::path>>
260 size_t latest_idx = 0;
262 std::optional<fs::path> read_only_latest_committed_snapshot =
264 if (read_snapshot_dir.has_value())
266 read_only_latest_committed_snapshot =
268 read_snapshot_dir.value(), latest_idx);
271 auto main_latest_committed_snapshot =
274 if (main_latest_committed_snapshot.has_value())
276 return std::make_pair(
277 snapshot_dir, main_latest_committed_snapshot.value());
280 if (read_only_latest_committed_snapshot.has_value())
282 return std::make_pair(
283 read_snapshot_dir.value(),
284 read_only_latest_committed_snapshot.value());
295 ::consensus::snapshot_allocate,
296 [
this](
const uint8_t* data,
size_t size) {
297 auto idx = serialized::read<::consensus::Index>(data, size);
298 auto evidence_idx = serialized::read<::consensus::Index>(data, size);
299 auto requested_size = serialized::read<size_t>(data, size);
300 auto generation_count = serialized::read<uint32_t>(data, size);
306 ::consensus::snapshot_allocated,
308 std::span<uint8_t>{snapshot->data(), snapshot->size()},
314 ::consensus::snapshot_commit,
315 [
this](
const uint8_t* data,
size_t size) {
316 auto snapshot_idx = serialized::read<::consensus::Index>(data, size);
Definition messaging.h:38
Definition ring_buffer_types.h:157
Definition snapshot_manager.h:21
fs::path get_main_directory() const
Definition snapshot_manager.h:65
SnapshotManager(const std::string &snapshot_dir_, ringbuffer::AbstractWriterFactory &writer_factory, const std::optional< std::string > &read_snapshot_dir_=std::nullopt)
Definition snapshot_manager.h:36
void commit_snapshot(::consensus::Index snapshot_idx, const uint8_t *receipt_data, size_t receipt_size)
Definition snapshot_manager.h:144
static void on_snapshot_sync_and_rename_complete(uv_work_t *req, int)
Definition snapshot_manager.h:130
static void on_snapshot_sync_and_rename(uv_work_t *req)
Definition snapshot_manager.h:109
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition snapshot_manager.h:290
std::optional< std::pair< fs::path, fs::path > > find_latest_committed_snapshot()
Definition snapshot_manager.h:257
std::shared_ptr< std::vector< uint8_t > > add_pending_snapshot(::consensus::Index idx, ::consensus::Index evidence_idx, size_t requested_size)
Definition snapshot_manager.h:70
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
uint64_t Index
Definition ledger_enclave_types.h:11
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
std::optional< fs::path > find_latest_committed_snapshot_in_directory(const fs::path &directory, size_t &latest_committed_snapshot_idx)
Definition filenames.h:136
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
#define THROW_ON_ERROR(x, name)
Definition snapshot_manager.h:84
Definition time_bound_logger.h:14
Definition snapshot_manager.h:99
const std::filesystem::path dir
Definition snapshot_manager.h:101
const int snapshot_fd
Definition snapshot_manager.h:103
const std::string tmp_file_name
Definition snapshot_manager.h:102
std::string committed_file_name
Definition snapshot_manager.h:106