CCF
Loading...
Searching...
No Matches
snapshotter.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/locking.h"
7#include "ds/ccf_assert.h"
9#include "kv/kv_types.h"
10#include "kv/store.h"
11#include "node/network_state.h"
14#include "tasks/task_system.h"
15
16#include <deque>
17#include <optional>
18
19namespace ccf
20{
21 class Snapshotter : public std::enable_shared_from_this<Snapshotter>,
23 {
24 private:
25 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
26
27 // Maximum number of pending snapshots allowed at a given time. No more
28 // snapshots are emitted when this threshold is reached and until pending
29 // snapshots are flushed on commit.
30 static constexpr auto max_pending_snapshots_count = 5;
31
33
34 ccf::pal::Mutex lock;
35
36 std::shared_ptr<ccf::kv::Store> store;
37
38 // Snapshots are never generated by default (e.g. during public recovery)
39 size_t snapshot_tx_interval = max_tx_interval;
40
41 struct SnapshotInfo
42 {
43 ccf::kv::Version version = 0;
44 ccf::crypto::Sha256Hash write_set_digest;
45 std::string commit_evidence;
46 ccf::crypto::Sha256Hash snapshot_digest;
47 std::vector<uint8_t> serialised_snapshot;
48
49 // Prevents the receipt from being passed to the host (on commit) in case
50 // host has not yet allocated memory for the snapshot.
51 bool is_stored = false;
52
53 std::optional<::consensus::Index> evidence_idx = std::nullopt;
54
55 std::optional<NodeId> node_id = std::nullopt;
56 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
57 std::optional<std::vector<uint8_t>> sig = std::nullopt;
58 std::optional<std::vector<uint8_t>> tree = std::nullopt;
59
60 SnapshotInfo() = default;
61 };
62 // Queue of pending snapshots that have been generated, but are not yet
63 // committed
64 std::map<uint32_t, SnapshotInfo> pending_snapshots;
65
66 // Initial snapshot index
67 static constexpr ::consensus::Index initial_snapshot_idx = 0;
68
69 // Index at which the latest snapshot was generated
70 ::consensus::Index last_snapshot_idx = 0;
71
72 // Used to suspend snapshot generation during public recovery
73 bool snapshot_generation_enabled = true;
74
75 // Indices at which a snapshot will be next generated and Boolean to
76 // indicate whether a snapshot was forced at the given index
77 struct SnapshotEntry
78 {
80 bool forced;
81 bool done;
82 };
83 std::deque<SnapshotEntry> next_snapshot_indices;
84
85 void commit_snapshot(
86 ::consensus::Index snapshot_idx,
87 const std::vector<uint8_t>& serialised_receipt)
88 {
89 // The snapshot_idx is used to retrieve the correct snapshot file
90 // previously generated.
91 auto to_host = writer_factory.create_writer_to_outside();
93 ::consensus::snapshot_commit,
94 to_host,
95 snapshot_idx,
96 serialised_receipt);
97 }
98
99 struct SnapshotTask : public ccf::tasks::BaseTask
100 {
101 std::shared_ptr<Snapshotter> self;
102 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
103 uint32_t generation_count;
104
105 const std::string name;
106
107 SnapshotTask(
108 std::shared_ptr<Snapshotter> _self,
109 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot>&& _snapshot,
110 uint32_t _generation_count) :
111 self(std::move(_self)),
112 snapshot(std::move(_snapshot)),
113 generation_count(_generation_count),
114 name(fmt::format(
115 "snapshot@{}[{}]", snapshot->get_version(), generation_count))
116 {}
117
118 void do_task_implementation() override
119 {
120 self->snapshot_(std::move(snapshot), generation_count);
121 }
122
123 [[nodiscard]] const std::string& get_name() const override
124 {
125 return name;
126 }
127 };
128
129 void snapshot_(
130 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
131 uint32_t generation_count)
132 {
133 auto snapshot_version = snapshot->get_version();
134
135 {
136 std::unique_lock<ccf::pal::Mutex> guard(lock);
137 if (pending_snapshots.size() >= max_pending_snapshots_count)
138 {
140 "Skipping new snapshot generation as {} snapshots are already "
141 "pending",
142 pending_snapshots.size());
143 return;
144 }
145
146 // It is possible that the signature following the snapshot evidence is
147 // scheduled by another thread while the below snapshot evidence
148 // transaction is committed. To allow for such scenario, the evidence
149 // seqno is recorded via `record_snapshot_evidence_idx()` on a hook
150 // rather than here.
151 pending_snapshots[generation_count] = {};
152 pending_snapshots[generation_count].version = snapshot_version;
153 }
154
155 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
156 auto serialised_snapshot_size = serialised_snapshot.size();
157
158 auto tx = store->create_tx();
159 auto* evidence = tx.rw<SnapshotEvidence>(Tables::SNAPSHOT_EVIDENCE);
160 auto snapshot_hash = ccf::crypto::Sha256Hash(serialised_snapshot);
161 evidence->put({snapshot_hash, snapshot_version});
162
164 // NOLINTNEXTLINE(performance-move-const-arg)
165 cd.set(std::move(snapshot_hash));
166
167 ccf::crypto::Sha256Hash ws_digest;
168 std::string commit_evidence;
169 auto capture_ws_digest_and_commit_evidence =
170 [&ws_digest, &commit_evidence](
171 const std::vector<uint8_t>& write_set,
172 const std::string& commit_evidence_) {
173 new (&ws_digest)
174 ccf::crypto::Sha256Hash({write_set.data(), write_set.size()});
175 commit_evidence = commit_evidence_;
176 };
177
178 auto rc = tx.commit(cd, nullptr, capture_ws_digest_and_commit_evidence);
180 {
182 "Could not commit snapshot evidence for seqno {}: {}",
183 snapshot_version,
184 rc);
185 return;
186 }
187
188 auto evidence_version = tx.commit_version();
189
190 {
191 std::unique_lock<ccf::pal::Mutex> guard(lock);
192 pending_snapshots[generation_count].commit_evidence = commit_evidence;
193 pending_snapshots[generation_count].write_set_digest = ws_digest;
194 pending_snapshots[generation_count].snapshot_digest = cd.value();
195 pending_snapshots[generation_count].serialised_snapshot =
196 std::move(serialised_snapshot);
197 }
198
199 auto to_host = writer_factory.create_writer_to_outside();
201 ::consensus::snapshot_allocate,
202 to_host,
203 snapshot_version,
204 evidence_version,
205 serialised_snapshot_size,
206 generation_count);
207
209 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
210 "seqno {}: {}, ws digest: {}",
211 serialised_snapshot_size,
212 snapshot_version,
213 evidence_version,
214 cd.value(),
215 ws_digest);
216 }
217
218 void update_indices(::consensus::Index idx)
219 {
220 while ((next_snapshot_indices.size() > 1) &&
221 (std::next(next_snapshot_indices.begin())->idx <= idx))
222 {
223 next_snapshot_indices.pop_front();
224 }
225
226 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
227 {
228 auto& snapshot_info = it->second;
229
230 if (
231 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
232 idx > snapshot_info.evidence_idx.value() &&
233 snapshot_info.sig.has_value() && snapshot_info.tree.has_value() &&
234 snapshot_info.node_id.has_value() &&
235 snapshot_info.node_cert.has_value())
236 {
237 auto serialised_receipt = build_and_serialise_receipt(
238 snapshot_info.sig.value(),
239 snapshot_info.tree.value(),
240 snapshot_info.node_id.value(),
241 snapshot_info.node_cert.value(),
242 snapshot_info.evidence_idx.value(),
243 snapshot_info.write_set_digest,
244 snapshot_info.commit_evidence,
245 std::move(snapshot_info.snapshot_digest));
246
247 commit_snapshot(snapshot_info.version, serialised_receipt);
248 it = pending_snapshots.erase(it);
249 }
250 else
251 {
252 ++it;
253 }
254 }
255 }
256
257 public:
259 ringbuffer::AbstractWriterFactory& writer_factory_,
260 std::shared_ptr<ccf::kv::Store>& store_,
261 size_t snapshot_tx_interval_) :
262 writer_factory(writer_factory_),
263 store(store_),
264 snapshot_tx_interval(snapshot_tx_interval_)
265 {
266 next_snapshot_indices.push_back({initial_snapshot_idx, false, true});
267 }
268
270 {
271 // After public recovery, the first node should have restored all
272 // snapshot indices in next_snapshot_indices so that snapshot
273 // generation can continue at the correct interval
274 std::lock_guard<ccf::pal::Mutex> guard(lock);
275
276 last_snapshot_idx = next_snapshot_indices.back().idx;
277 }
278
279 void set_snapshot_generation(bool enabled)
280 {
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
282 snapshot_generation_enabled = enabled;
283 }
284
286 {
287 // Should only be called once, after a snapshot has been applied
288 std::lock_guard<ccf::pal::Mutex> guard(lock);
289
290 if (last_snapshot_idx != 0)
291 {
292 throw std::logic_error(
293 "Last snapshot index can only be set if no snapshot has been "
294 "generated");
295 }
296
297 last_snapshot_idx = idx;
298
299 next_snapshot_indices.clear();
300 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
301 }
302
304 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
305 {
306 std::lock_guard<ccf::pal::Mutex> guard(lock);
307
308 auto search = pending_snapshots.find(generation_count);
309 if (search == pending_snapshots.end())
310 {
312 "Could not find pending snapshot to write for generation count {}",
313 generation_count);
314 return false;
315 }
316
317 auto& pending_snapshot = search->second;
318 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
319 {
320 // Unreliable host: allocated snapshot buffer is not of expected
321 // size. The pending snapshot is discarded to reduce enclave memory
322 // usage.
324 "Host allocated snapshot buffer [{} bytes] is not of expected "
325 "size [{} bytes]. Discarding snapshot for seqno {}",
326 snapshot_buf.size(),
327 pending_snapshot.serialised_snapshot.size(),
328 pending_snapshot.version);
329 pending_snapshots.erase(search);
330 return false;
331 }
332
333 std::copy(
334 pending_snapshot.serialised_snapshot.begin(),
335 pending_snapshot.serialised_snapshot.end(),
336 snapshot_buf.begin());
337 pending_snapshot.is_stored = true;
338
340 "Successfully copied snapshot at seqno {} to host memory [{} "
341 "bytes]",
342 pending_snapshot.version,
343 pending_snapshot.serialised_snapshot.size());
344 return true;
345 }
346
348 {
349 // Returns true if the committable idx will require the generation of a
350 // snapshot, and thus a new ledger chunk
351 std::lock_guard<ccf::pal::Mutex> guard(lock);
352
354 idx >= next_snapshot_indices.back().idx,
355 "Committable seqno {} < next snapshot seqno {}",
356 idx,
357 next_snapshot_indices.back().idx);
358
359 bool forced = store->flag_enabled_unsafe(
361
362 ::consensus::Index last_unforced_idx = last_snapshot_idx;
363 for (const auto& next_snapshot_indice :
364 std::ranges::reverse_view(next_snapshot_indices))
365 {
366 if (!next_snapshot_indice.forced)
367 {
368 last_unforced_idx = next_snapshot_indice.idx;
369 break;
370 }
371 }
372
373 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
374 if (due || forced)
375 {
376 next_snapshot_indices.push_back({idx, !due, false});
378 "{} {} as snapshot index", !due ? "Forced" : "Recorded", idx);
379 store->unset_flag_unsafe(
381 return due;
382 }
383
384 return false;
385 }
386
389 const std::vector<uint8_t>& sig,
390 const NodeId& node_id,
391 const ccf::crypto::Pem& node_cert)
392 {
393 std::lock_guard<ccf::pal::Mutex> guard(lock);
394
395 for (auto& [_, pending_snapshot] : pending_snapshots)
396 {
397 if (
398 pending_snapshot.evidence_idx.has_value() &&
399 idx > pending_snapshot.evidence_idx.value() &&
400 !pending_snapshot.sig.has_value())
401 {
403 "Recording signature at {} for snapshot {} with evidence at {}",
404 idx,
405 pending_snapshot.version,
406 pending_snapshot.evidence_idx.value());
407
408 pending_snapshot.node_id = node_id;
409 pending_snapshot.node_cert = node_cert;
410 pending_snapshot.sig = sig;
411 }
412 }
413 }
414
416 ::consensus::Index idx, const std::vector<uint8_t>& tree)
417 {
418 std::lock_guard<ccf::pal::Mutex> guard(lock);
419
420 for (auto& [_, pending_snapshot] : pending_snapshots)
421 {
422 if (
423 pending_snapshot.evidence_idx.has_value() &&
424 idx > pending_snapshot.evidence_idx.value() &&
425 !pending_snapshot.tree.has_value())
426 {
428 "Recording serialised tree at {} for snapshot {} with evidence at "
429 "{}",
430 idx,
431 pending_snapshot.version,
432 pending_snapshot.evidence_idx.value());
433
434 pending_snapshot.tree = tree;
435 }
436 }
437 }
438
440 ::consensus::Index idx, const SnapshotHash& snapshot)
441 {
442 std::lock_guard<ccf::pal::Mutex> guard(lock);
443
444 for (auto& [_, pending_snapshot] : pending_snapshots)
445 {
446 if (pending_snapshot.version == snapshot.version)
447 {
449 "Recording evidence idx at {} for snapshot {}",
450 idx,
451 pending_snapshot.version);
452
453 pending_snapshot.evidence_idx = idx;
454 }
455 }
456 }
457
459 {
460 static uint32_t generation_count = 0;
461
462 auto task = std::make_shared<SnapshotTask>(
463 shared_from_this(),
464 store->snapshot_unsafe_maps(idx),
465 generation_count++);
466
468 }
469
470 void commit(::consensus::Index idx, bool generate_snapshot) override
471 {
472 // If generate_snapshot is true, takes a snapshot of the key value store
473 // at the last snapshottable index before idx, and schedule snapshot
474 // serialisation on another thread (round-robin). Otherwise, only record
475 // that a snapshot was generated.
476
477 ccf::kv::ScopedStoreMapsLock maps_lock(store);
478 std::lock_guard<ccf::pal::Mutex> guard(lock);
479
480 update_indices(idx);
481
482 if (idx < last_snapshot_idx)
483 {
484 throw std::logic_error(fmt::format(
485 "Cannot snapshot at seqno {} which is earlier than last snapshot "
486 "seqno {}",
487 idx,
488 last_snapshot_idx));
489 }
490
492 idx >= next_snapshot_indices.front().idx,
493 "Cannot commit snapshotter at {}, which is before last snapshottable "
494 "idx {}",
495 idx,
496 next_snapshot_indices.front().idx);
497
498 auto& next = next_snapshot_indices.front();
499 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
500 if (due || (next.forced && !next.done))
501 {
502 if (
503 snapshot_generation_enabled && generate_snapshot && (next.idx != 0u))
504 {
505 schedule_snapshot(next.idx);
506 next.done = true;
507 }
508
509 if (due && !next.forced)
510 {
511 // last_snapshot_idx records the last normally scheduled, i.e.
512 // unforced, snapshot index, so that backups (which don't know forced
513 // indices) continue the snapshot interval normally.
514 last_snapshot_idx = next.idx;
516 "Recorded {} as last snapshot index", last_snapshot_idx);
517 }
518 }
519 }
520
521 void rollback(::consensus::Index idx) override
522 {
523 std::lock_guard<ccf::pal::Mutex> guard(lock);
524
525 while (!next_snapshot_indices.empty() &&
526 (next_snapshot_indices.back().idx > idx))
527 {
528 next_snapshot_indices.pop_back();
529 }
530
531 if (next_snapshot_indices.empty())
532 {
533 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
534 }
535
537 "Rolled back snapshotter: last snapshottable idx is now {}",
538 next_snapshot_indices.front().idx);
539
540 while (!pending_snapshots.empty())
541 {
542 const auto& last_snapshot = std::prev(pending_snapshots.end());
543 if (auto evidence_opt = last_snapshot->second.evidence_idx;
544 evidence_opt.has_value() && idx >= evidence_opt.value())
545 {
546 break;
547 }
548
549 pending_snapshots.erase(last_snapshot);
550 }
551 }
552 };
553}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition claims_digest.h:10
void set(Digest &&digest_)
Definition claims_digest.h:21
const Digest & value() const
Definition claims_digest.h:38
Definition snapshotter.h:23
void init_after_public_recovery()
Definition snapshotter.h:269
void commit(::consensus::Index idx, bool generate_snapshot) override
Definition snapshotter.h:470
void record_snapshot_evidence_idx(::consensus::Index idx, const SnapshotHash &snapshot)
Definition snapshotter.h:439
void schedule_snapshot(::consensus::Index idx)
Definition snapshotter.h:458
Snapshotter(ringbuffer::AbstractWriterFactory &writer_factory_, std::shared_ptr< ccf::kv::Store > &store_, size_t snapshot_tx_interval_)
Definition snapshotter.h:258
bool write_snapshot(std::span< uint8_t > snapshot_buf, uint32_t generation_count)
Definition snapshotter.h:303
void record_signature(::consensus::Index idx, const std::vector< uint8_t > &sig, const NodeId &node_id, const ccf::crypto::Pem &node_cert)
Definition snapshotter.h:387
void rollback(::consensus::Index idx) override
Definition snapshotter.h:521
void record_serialised_tree(::consensus::Index idx, const std::vector< uint8_t > &tree)
Definition snapshotter.h:415
void set_snapshot_generation(bool enabled)
Definition snapshotter.h:279
bool record_committable(::consensus::Index idx) override
Definition snapshotter.h:347
void set_last_snapshot_idx(::consensus::Index idx)
Definition snapshotter.h:285
Definition pem.h:18
Definition sha256_hash.h:16
Definition kv_types.h:509
Definition kv_types.h:693
Definition value.h:32
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_outside()=0
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
@ SUCCESS
Definition kv_types.h:210
uint64_t Version
Definition version.h:8
std::mutex Mutex
Definition locking.h:12
void add_task(Task task)
Definition task_system.cpp:65
Definition app_interface.h:14
uint64_t Index
Definition ledger_enclave_types.h:11
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition snapshot_evidence.h:12
ccf::kv::Version version
Sequence number to which the snapshot corresponds.
Definition snapshot_evidence.h:16
Definition task.h:15