21 class Snapshotter :
public std::enable_shared_from_this<Snapshotter>,
25 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
30 static constexpr auto max_pending_snapshots_count = 5;
36 std::shared_ptr<ccf::kv::Store> store;
39 size_t snapshot_tx_interval = max_tx_interval;
45 std::string commit_evidence;
47 std::vector<uint8_t> serialised_snapshot;
51 bool is_stored =
false;
53 std::optional<::consensus::Index> evidence_idx = std::nullopt;
55 std::optional<NodeId> node_id = std::nullopt;
56 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
57 std::optional<std::vector<uint8_t>> sig = std::nullopt;
58 std::optional<std::vector<uint8_t>> tree = std::nullopt;
60 SnapshotInfo() =
default;
64 std::map<uint32_t, SnapshotInfo> pending_snapshots;
67 static constexpr ::consensus::Index initial_snapshot_idx = 0;
73 bool snapshot_generation_enabled =
true;
83 std::deque<SnapshotEntry> next_snapshot_indices;
87 const std::vector<uint8_t>& serialised_receipt)
93 ::consensus::snapshot_commit,
101 std::shared_ptr<Snapshotter> self;
102 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
103 uint32_t generation_count;
105 const std::string name;
108 std::shared_ptr<Snapshotter> _self,
109 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot>&& _snapshot,
110 uint32_t _generation_count) :
111 self(std::move(_self)),
112 snapshot(std::move(_snapshot)),
113 generation_count(_generation_count),
115 "snapshot@{}[{}]", snapshot->get_version(), generation_count))
118 void do_task_implementation()
override
120 self->snapshot_(std::move(snapshot), generation_count);
123 [[nodiscard]]
const std::string& get_name()
const override
130 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
131 uint32_t generation_count)
133 auto snapshot_version = snapshot->get_version();
136 std::unique_lock<ccf::pal::Mutex> guard(lock);
137 if (pending_snapshots.size() >= max_pending_snapshots_count)
140 "Skipping new snapshot generation as {} snapshots are already "
142 pending_snapshots.size());
151 pending_snapshots[generation_count] = {};
152 pending_snapshots[generation_count].version = snapshot_version;
155 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
156 auto serialised_snapshot_size = serialised_snapshot.size();
158 auto tx = store->create_tx();
161 evidence->put({snapshot_hash, snapshot_version});
165 cd.
set(std::move(snapshot_hash));
168 std::string commit_evidence;
169 auto capture_ws_digest_and_commit_evidence =
170 [&ws_digest, &commit_evidence](
171 const std::vector<uint8_t>& write_set,
172 const std::string& commit_evidence_) {
175 commit_evidence = commit_evidence_;
178 auto rc = tx.commit(cd,
nullptr, capture_ws_digest_and_commit_evidence);
182 "Could not commit snapshot evidence for seqno {}: {}",
188 auto evidence_version = tx.commit_version();
191 std::unique_lock<ccf::pal::Mutex> guard(lock);
192 pending_snapshots[generation_count].commit_evidence = commit_evidence;
193 pending_snapshots[generation_count].write_set_digest = ws_digest;
194 pending_snapshots[generation_count].snapshot_digest = cd.
value();
195 pending_snapshots[generation_count].serialised_snapshot =
196 std::move(serialised_snapshot);
201 ::consensus::snapshot_allocate,
205 serialised_snapshot_size,
209 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
210 "seqno {}: {}, ws digest: {}",
211 serialised_snapshot_size,
220 while ((next_snapshot_indices.size() > 1) &&
221 (std::next(next_snapshot_indices.begin())->idx <= idx))
223 next_snapshot_indices.pop_front();
226 for (
auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
228 auto& snapshot_info = it->second;
231 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
232 idx > snapshot_info.evidence_idx.value() &&
233 snapshot_info.sig.has_value() && snapshot_info.tree.has_value() &&
234 snapshot_info.node_id.has_value() &&
235 snapshot_info.node_cert.has_value())
237 auto serialised_receipt = build_and_serialise_receipt(
238 snapshot_info.sig.value(),
239 snapshot_info.tree.value(),
240 snapshot_info.node_id.value(),
241 snapshot_info.node_cert.value(),
242 snapshot_info.evidence_idx.value(),
243 snapshot_info.write_set_digest,
244 snapshot_info.commit_evidence,
245 std::move(snapshot_info.snapshot_digest));
247 commit_snapshot(snapshot_info.version, serialised_receipt);
248 it = pending_snapshots.erase(it);
260 std::shared_ptr<ccf::kv::Store>& store_,
261 size_t snapshot_tx_interval_) :
262 writer_factory(writer_factory_),
264 snapshot_tx_interval(snapshot_tx_interval_)
266 next_snapshot_indices.push_back({initial_snapshot_idx,
false,
true});
274 std::lock_guard<ccf::pal::Mutex> guard(lock);
276 last_snapshot_idx = next_snapshot_indices.back().idx;
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
282 snapshot_generation_enabled = enabled;
288 std::lock_guard<ccf::pal::Mutex> guard(lock);
290 if (last_snapshot_idx != 0)
292 throw std::logic_error(
293 "Last snapshot index can only be set if no snapshot has been "
297 last_snapshot_idx = idx;
299 next_snapshot_indices.clear();
300 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
304 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
306 std::lock_guard<ccf::pal::Mutex> guard(lock);
308 auto search = pending_snapshots.find(generation_count);
309 if (search == pending_snapshots.end())
312 "Could not find pending snapshot to write for generation count {}",
317 auto& pending_snapshot = search->second;
318 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
324 "Host allocated snapshot buffer [{} bytes] is not of expected "
325 "size [{} bytes]. Discarding snapshot for seqno {}",
327 pending_snapshot.serialised_snapshot.size(),
328 pending_snapshot.version);
329 pending_snapshots.erase(search);
334 pending_snapshot.serialised_snapshot.begin(),
335 pending_snapshot.serialised_snapshot.end(),
336 snapshot_buf.begin());
337 pending_snapshot.is_stored =
true;
340 "Successfully copied snapshot at seqno {} to host memory [{} "
342 pending_snapshot.version,
343 pending_snapshot.serialised_snapshot.size());
351 std::lock_guard<ccf::pal::Mutex> guard(lock);
354 idx >= next_snapshot_indices.back().idx,
355 "Committable seqno {} < next snapshot seqno {}",
357 next_snapshot_indices.back().idx);
359 bool forced = store->flag_enabled_unsafe(
363 for (
const auto& next_snapshot_indice :
364 std::ranges::reverse_view(next_snapshot_indices))
366 if (!next_snapshot_indice.forced)
368 last_unforced_idx = next_snapshot_indice.idx;
373 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
376 next_snapshot_indices.push_back({idx, !due,
false});
378 "{} {} as snapshot index", !due ?
"Forced" :
"Recorded", idx);
379 store->unset_flag_unsafe(
389 const std::vector<uint8_t>& sig,
393 std::lock_guard<ccf::pal::Mutex> guard(lock);
395 for (
auto& [_, pending_snapshot] : pending_snapshots)
398 pending_snapshot.evidence_idx.has_value() &&
399 idx > pending_snapshot.evidence_idx.value() &&
400 !pending_snapshot.sig.has_value())
403 "Recording signature at {} for snapshot {} with evidence at {}",
405 pending_snapshot.version,
406 pending_snapshot.evidence_idx.value());
408 pending_snapshot.node_id = node_id;
409 pending_snapshot.node_cert = node_cert;
410 pending_snapshot.sig = sig;
418 std::lock_guard<ccf::pal::Mutex> guard(lock);
420 for (
auto& [_, pending_snapshot] : pending_snapshots)
423 pending_snapshot.evidence_idx.has_value() &&
424 idx > pending_snapshot.evidence_idx.value() &&
425 !pending_snapshot.tree.has_value())
428 "Recording serialised tree at {} for snapshot {} with evidence at "
431 pending_snapshot.version,
432 pending_snapshot.evidence_idx.value());
434 pending_snapshot.tree = tree;
442 std::lock_guard<ccf::pal::Mutex> guard(lock);
444 for (
auto& [_, pending_snapshot] : pending_snapshots)
446 if (pending_snapshot.version == snapshot.
version)
449 "Recording evidence idx at {} for snapshot {}",
451 pending_snapshot.version);
453 pending_snapshot.evidence_idx = idx;
460 static uint32_t generation_count = 0;
462 auto task = std::make_shared<SnapshotTask>(
464 store->snapshot_unsafe_maps(idx),
478 std::lock_guard<ccf::pal::Mutex> guard(lock);
482 if (idx < last_snapshot_idx)
484 throw std::logic_error(fmt::format(
485 "Cannot snapshot at seqno {} which is earlier than last snapshot "
492 idx >= next_snapshot_indices.front().idx,
493 "Cannot commit snapshotter at {}, which is before last snapshottable "
496 next_snapshot_indices.front().idx);
498 auto& next = next_snapshot_indices.front();
499 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
500 if (due || (next.forced && !next.done))
503 snapshot_generation_enabled && generate_snapshot && (next.idx != 0u))
509 if (due && !next.forced)
514 last_snapshot_idx = next.idx;
516 "Recorded {} as last snapshot index", last_snapshot_idx);
523 std::lock_guard<ccf::pal::Mutex> guard(lock);
525 while (!next_snapshot_indices.empty() &&
526 (next_snapshot_indices.back().idx > idx))
528 next_snapshot_indices.pop_back();
531 if (next_snapshot_indices.empty())
533 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
537 "Rolled back snapshotter: last snapshottable idx is now {}",
538 next_snapshot_indices.front().idx);
540 while (!pending_snapshots.empty())
542 const auto& last_snapshot = std::prev(pending_snapshots.end());
543 if (
auto evidence_opt = last_snapshot->second.evidence_idx;
544 evidence_opt.has_value() && idx >= evidence_opt.value())
549 pending_snapshots.erase(last_snapshot);