CCF
Loading...
Searching...
No Matches
snapshotter.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ccf/pal/locking.h"
8#include "ds/ccf_assert.h"
10#include "kv/kv_types.h"
11#include "kv/store.h"
12#include "node/network_state.h"
15
16#include <deque>
17#include <optional>
18
19namespace ccf
20{
21 class Snapshotter : public std::enable_shared_from_this<Snapshotter>,
23 {
24 private:
25 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
26
27 // Maximum number of pending snapshots allowed at a given time. No more
28 // snapshots are emitted when this threshold is reached and until pending
29 // snapshots are flushed on commit.
30 static constexpr auto max_pending_snapshots_count = 5;
31
33
34 ccf::pal::Mutex lock;
35
36 std::shared_ptr<ccf::kv::Store> store;
37
38 // Snapshots are never generated by default (e.g. during public recovery)
39 size_t snapshot_tx_interval = max_tx_interval;
40
41 struct SnapshotInfo
42 {
43 ccf::kv::Version version;
44 ccf::crypto::Sha256Hash write_set_digest;
45 std::string commit_evidence;
46 ccf::crypto::Sha256Hash snapshot_digest;
47 std::vector<uint8_t> serialised_snapshot;
48
49 // Prevents the receipt from being passed to the host (on commit) in case
50 // host has not yet allocated memory for the snapshot.
51 bool is_stored = false;
52
53 std::optional<::consensus::Index> evidence_idx = std::nullopt;
54
55 std::optional<NodeId> node_id = std::nullopt;
56 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
57 std::optional<std::vector<uint8_t>> sig = std::nullopt;
58 std::optional<std::vector<uint8_t>> tree = std::nullopt;
59
60 SnapshotInfo() = default;
61 };
62 // Queue of pending snapshots that have been generated, but are not yet
63 // committed
64 std::map<uint32_t, SnapshotInfo> pending_snapshots;
65
66 // Initial snapshot index
67 static constexpr ::consensus::Index initial_snapshot_idx = 0;
68
69 // Index at which the latest snapshot was generated
70 ::consensus::Index last_snapshot_idx = 0;
71
72 // Used to suspend snapshot generation during public recovery
73 bool snapshot_generation_enabled = true;
74
75 // Indices at which a snapshot will be next generated and Boolean to
76 // indicate whether a snapshot was forced at the given index
77 struct SnapshotEntry
78 {
80 bool forced;
81 bool done;
82 };
83 std::deque<SnapshotEntry> next_snapshot_indices;
84
85 void commit_snapshot(
86 ::consensus::Index snapshot_idx,
87 const std::vector<uint8_t>& serialised_receipt)
88 {
89 // The snapshot_idx is used to retrieve the correct snapshot file
90 // previously generated.
91 auto to_host = writer_factory.create_writer_to_outside();
93 ::consensus::snapshot_commit,
94 to_host,
95 snapshot_idx,
96 serialised_receipt);
97 }
98
99 struct SnapshotMsg
100 {
101 std::shared_ptr<Snapshotter> self;
102 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
103 uint32_t generation_count;
104 };
105
106 static void snapshot_cb(std::unique_ptr<::threading::Tmsg<SnapshotMsg>> msg)
107 {
108 msg->data.self->snapshot_(
109 std::move(msg->data.snapshot), msg->data.generation_count);
110 }
111
112 void snapshot_(
113 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
114 uint32_t generation_count)
115 {
116 auto snapshot_version = snapshot->get_version();
117
118 {
119 std::unique_lock<ccf::pal::Mutex> guard(lock);
120 if (pending_snapshots.size() >= max_pending_snapshots_count)
121 {
123 "Skipping new snapshot generation as {} snapshots are already "
124 "pending",
125 pending_snapshots.size());
126 return;
127 }
128
129 // It is possible that the signature following the snapshot evidence is
130 // scheduled by another thread while the below snapshot evidence
131 // transaction is committed. To allow for such scenario, the evidence
132 // seqno is recorded via `record_snapshot_evidence_idx()` on a hook
133 // rather than here.
134 pending_snapshots[generation_count] = {};
135 pending_snapshots[generation_count].version = snapshot_version;
136 }
137
138 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
139 auto serialised_snapshot_size = serialised_snapshot.size();
140
141 auto tx = store->create_tx();
142 auto evidence = tx.rw<SnapshotEvidence>(Tables::SNAPSHOT_EVIDENCE);
143 auto snapshot_hash = ccf::crypto::Sha256Hash(serialised_snapshot);
144 evidence->put({snapshot_hash, snapshot_version});
145
147 cd.set(std::move(snapshot_hash));
148
149 ccf::crypto::Sha256Hash ws_digest;
150 std::string commit_evidence;
151 auto capture_ws_digest_and_commit_evidence =
152 [&ws_digest, &commit_evidence](
153 const std::vector<uint8_t>& write_set,
154 const std::string& commit_evidence_) {
155 new (&ws_digest)
156 ccf::crypto::Sha256Hash({write_set.data(), write_set.size()});
157 commit_evidence = commit_evidence_;
158 };
159
160 auto rc = tx.commit(cd, nullptr, capture_ws_digest_and_commit_evidence);
162 {
164 "Could not commit snapshot evidence for seqno {}: {}",
165 snapshot_version,
166 rc);
167 return;
168 }
169
170 auto evidence_version = tx.commit_version();
171
172 {
173 std::unique_lock<ccf::pal::Mutex> guard(lock);
174 pending_snapshots[generation_count].commit_evidence = commit_evidence;
175 pending_snapshots[generation_count].write_set_digest = ws_digest;
176 pending_snapshots[generation_count].snapshot_digest = cd.value();
177 pending_snapshots[generation_count].serialised_snapshot =
178 std::move(serialised_snapshot);
179 }
180
181 auto to_host = writer_factory.create_writer_to_outside();
183 ::consensus::snapshot_allocate,
184 to_host,
185 snapshot_version,
186 evidence_version,
187 serialised_snapshot_size,
188 generation_count);
189
191 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
192 "seqno {}: {}, ws digest: {}",
193 serialised_snapshot_size,
194 snapshot_version,
195 evidence_version,
196 cd.value(),
197 ws_digest);
198 }
199
200 void update_indices(::consensus::Index idx)
201 {
202 while ((next_snapshot_indices.size() > 1) &&
203 (std::next(next_snapshot_indices.begin())->idx <= idx))
204 {
205 next_snapshot_indices.pop_front();
206 }
207
208 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
209 {
210 auto& snapshot_info = it->second;
211
212 if (
213 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
214 idx > snapshot_info.evidence_idx.value())
215 {
216 auto serialised_receipt = build_and_serialise_receipt(
217 snapshot_info.sig.value(),
218 snapshot_info.tree.value(),
219 snapshot_info.node_id.value(),
220 snapshot_info.node_cert.value(),
221 snapshot_info.evidence_idx.value(),
222 snapshot_info.write_set_digest,
223 snapshot_info.commit_evidence,
224 std::move(snapshot_info.snapshot_digest));
225
226 commit_snapshot(snapshot_info.version, serialised_receipt);
227 it = pending_snapshots.erase(it);
228 }
229 else
230 {
231 ++it;
232 }
233 }
234 }
235
236 public:
238 ringbuffer::AbstractWriterFactory& writer_factory_,
239 std::shared_ptr<ccf::kv::Store>& store_,
240 size_t snapshot_tx_interval_) :
241 writer_factory(writer_factory_),
242 store(store_),
243 snapshot_tx_interval(snapshot_tx_interval_)
244 {
245 next_snapshot_indices.push_back({initial_snapshot_idx, false, true});
246 }
247
249 {
250 // After public recovery, the first node should have restored all
251 // snapshot indices in next_snapshot_indices so that snapshot
252 // generation can continue at the correct interval
253 std::lock_guard<ccf::pal::Mutex> guard(lock);
254
255 last_snapshot_idx = next_snapshot_indices.back().idx;
256 }
257
258 void set_snapshot_generation(bool enabled)
259 {
260 std::lock_guard<ccf::pal::Mutex> guard(lock);
261 snapshot_generation_enabled = enabled;
262 }
263
265 {
266 // Should only be called once, after a snapshot has been applied
267 std::lock_guard<ccf::pal::Mutex> guard(lock);
268
269 if (last_snapshot_idx != 0)
270 {
271 throw std::logic_error(
272 "Last snapshot index can only be set if no snapshot has been "
273 "generated");
274 }
275
276 last_snapshot_idx = idx;
277
278 next_snapshot_indices.clear();
279 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
280 }
281
283 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
284 {
285 std::lock_guard<ccf::pal::Mutex> guard(lock);
286
287 auto search = pending_snapshots.find(generation_count);
288 if (search == pending_snapshots.end())
289 {
291 "Could not find pending snapshot to write for generation count {}",
292 generation_count);
293 return false;
294 }
295
296 auto& pending_snapshot = search->second;
297 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
298 {
299 // Unreliable host: allocated snapshot buffer is not of expected
300 // size. The pending snapshot is discarded to reduce enclave memory
301 // usage.
303 "Host allocated snapshot buffer [{} bytes] is not of expected "
304 "size [{} bytes]. Discarding snapshot for seqno {}",
305 snapshot_buf.size(),
306 pending_snapshot.serialised_snapshot.size(),
307 pending_snapshot.version);
308 pending_snapshots.erase(search);
309 return false;
310 }
311
312 std::copy(
313 pending_snapshot.serialised_snapshot.begin(),
314 pending_snapshot.serialised_snapshot.end(),
315 snapshot_buf.begin());
316 pending_snapshot.is_stored = true;
317
319 "Successfully copied snapshot at seqno {} to host memory [{} "
320 "bytes]",
321 pending_snapshot.version,
322 pending_snapshot.serialised_snapshot.size());
323 return true;
324 }
325
327 {
328 // Returns true if the committable idx will require the generation of a
329 // snapshot, and thus a new ledger chunk
330 std::lock_guard<ccf::pal::Mutex> guard(lock);
331
333 idx >= next_snapshot_indices.back().idx,
334 "Committable seqno {} < next snapshot seqno {}",
335 idx,
336 next_snapshot_indices.back().idx);
337
338 bool forced = store->flag_enabled_unsafe(
340
341 ::consensus::Index last_unforced_idx = last_snapshot_idx;
342 for (auto it = next_snapshot_indices.rbegin();
343 it != next_snapshot_indices.rend();
344 it++)
345 {
346 if (!it->forced)
347 {
348 last_unforced_idx = it->idx;
349 break;
350 }
351 }
352
353 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
354 if (due || forced)
355 {
356 next_snapshot_indices.push_back({idx, !due, false});
358 "{} {} as snapshot index", !due ? "Forced" : "Recorded", idx);
359 store->unset_flag_unsafe(
361 return due;
362 }
363
364 return false;
365 }
366
369 const std::vector<uint8_t>& sig,
370 const NodeId& node_id,
371 const ccf::crypto::Pem& node_cert)
372 {
373 std::lock_guard<ccf::pal::Mutex> guard(lock);
374
375 for (auto& [_, pending_snapshot] : pending_snapshots)
376 {
377 if (
378 pending_snapshot.evidence_idx.has_value() &&
379 idx > pending_snapshot.evidence_idx.value() &&
380 !pending_snapshot.sig.has_value())
381 {
383 "Recording signature at {} for snapshot {} with evidence at {}",
384 idx,
385 pending_snapshot.version,
386 pending_snapshot.evidence_idx.value());
387
388 pending_snapshot.node_id = node_id;
389 pending_snapshot.node_cert = node_cert;
390 pending_snapshot.sig = sig;
391 }
392 }
393 }
394
396 ::consensus::Index idx, const std::vector<uint8_t>& tree)
397 {
398 std::lock_guard<ccf::pal::Mutex> guard(lock);
399
400 for (auto& [_, pending_snapshot] : pending_snapshots)
401 {
402 if (
403 pending_snapshot.evidence_idx.has_value() &&
404 idx > pending_snapshot.evidence_idx.value() &&
405 !pending_snapshot.tree.has_value())
406 {
408 "Recording serialised tree at {} for snapshot {} with evidence at "
409 "{}",
410 idx,
411 pending_snapshot.version,
412 pending_snapshot.evidence_idx.value());
413
414 pending_snapshot.tree = tree;
415 }
416 }
417 }
418
420 ::consensus::Index idx, const SnapshotHash& snapshot)
421 {
422 std::lock_guard<ccf::pal::Mutex> guard(lock);
423
424 for (auto& [_, pending_snapshot] : pending_snapshots)
425 {
426 if (pending_snapshot.version == snapshot.version)
427 {
429 "Recording evidence idx at {} for snapshot {}",
430 idx,
431 pending_snapshot.version);
432
433 pending_snapshot.evidence_idx = idx;
434 }
435 }
436 }
437
439 {
440 static uint32_t generation_count = 0;
441 auto msg = std::make_unique<::threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
442 msg->data.self = shared_from_this();
443 msg->data.snapshot = store->snapshot_unsafe_maps(idx);
444 msg->data.generation_count = generation_count++;
445
447 tm.add_task(tm.get_execution_thread(generation_count), std::move(msg));
448 }
449
450 void commit(::consensus::Index idx, bool generate_snapshot) override
451 {
452 // If generate_snapshot is true, takes a snapshot of the key value store
453 // at the last snapshottable index before idx, and schedule snapshot
454 // serialisation on another thread (round-robin). Otherwise, only record
455 // that a snapshot was generated.
456
457 ccf::kv::ScopedStoreMapsLock maps_lock(store);
458 std::lock_guard<ccf::pal::Mutex> guard(lock);
459
460 update_indices(idx);
461
462 if (idx < last_snapshot_idx)
463 {
464 throw std::logic_error(fmt::format(
465 "Cannot snapshot at seqno {} which is earlier than last snapshot "
466 "seqno {}",
467 idx,
468 last_snapshot_idx));
469 }
470
472 idx >= next_snapshot_indices.front().idx,
473 "Cannot commit snapshotter at {}, which is before last snapshottable "
474 "idx {}",
475 idx,
476 next_snapshot_indices.front().idx);
477
478 auto& next = next_snapshot_indices.front();
479 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
480 if (due || (next.forced && !next.done))
481 {
482 if (snapshot_generation_enabled && generate_snapshot && next.idx)
483 {
484 schedule_snapshot(next.idx);
485 next.done = true;
486 }
487
488 if (due && !next.forced)
489 {
490 // last_snapshot_idx records the last normally scheduled, i.e.
491 // unforced, snapshot index, so that backups (which don't know forced
492 // indices) continue the snapshot interval normally.
493 last_snapshot_idx = next.idx;
495 "Recorded {} as last snapshot index", last_snapshot_idx);
496 }
497 }
498 }
499
500 void rollback(::consensus::Index idx) override
501 {
502 std::lock_guard<ccf::pal::Mutex> guard(lock);
503
504 while (!next_snapshot_indices.empty() &&
505 (next_snapshot_indices.back().idx > idx))
506 {
507 next_snapshot_indices.pop_back();
508 }
509
510 if (next_snapshot_indices.empty())
511 {
512 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
513 }
514
516 "Rolled back snapshotter: last snapshottable idx is now {}",
517 next_snapshot_indices.front().idx);
518
519 while (!pending_snapshots.empty())
520 {
521 const auto& last_snapshot = std::prev(pending_snapshots.end());
522 if (
523 last_snapshot->second.evidence_idx.has_value() &&
524 idx >= last_snapshot->second.evidence_idx.value())
525 {
526 break;
527 }
528
529 pending_snapshots.erase(last_snapshot);
530 }
531 }
532 };
533}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition claims_digest.h:10
void set(Digest &&digest_)
Definition claims_digest.h:21
const Digest & value() const
Definition claims_digest.h:38
Definition snapshotter.h:23
void init_after_public_recovery()
Definition snapshotter.h:248
void commit(::consensus::Index idx, bool generate_snapshot) override
Definition snapshotter.h:450
void record_snapshot_evidence_idx(::consensus::Index idx, const SnapshotHash &snapshot)
Definition snapshotter.h:419
void schedule_snapshot(::consensus::Index idx)
Definition snapshotter.h:438
Snapshotter(ringbuffer::AbstractWriterFactory &writer_factory_, std::shared_ptr< ccf::kv::Store > &store_, size_t snapshot_tx_interval_)
Definition snapshotter.h:237
bool write_snapshot(std::span< uint8_t > snapshot_buf, uint32_t generation_count)
Definition snapshotter.h:282
void record_signature(::consensus::Index idx, const std::vector< uint8_t > &sig, const NodeId &node_id, const ccf::crypto::Pem &node_cert)
Definition snapshotter.h:367
void rollback(::consensus::Index idx) override
Definition snapshotter.h:500
void record_serialised_tree(::consensus::Index idx, const std::vector< uint8_t > &tree)
Definition snapshotter.h:395
void set_snapshot_generation(bool enabled)
Definition snapshotter.h:258
bool record_committable(::consensus::Index idx) override
Definition snapshotter.h:326
void set_last_snapshot_idx(::consensus::Index idx)
Definition snapshotter.h:264
Definition pem.h:18
Definition sha256_hash.h:16
Definition kv_types.h:579
Definition kv_types.h:763
Definition value.h:32
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_outside()=0
static ThreadMessaging & instance()
Definition thread_messaging.h:283
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
uint64_t Version
Definition version.h:8
@ SUCCESS
Definition kv_types.h:249
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
uint64_t Index
Definition ledger_enclave_types.h:11
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition snapshot_evidence.h:12
ccf::kv::Version version
Sequence number to which the snapshot corresponds.
Definition snapshot_evidence.h:16
Definition thread_messaging.h:27