21 class Snapshotter :
public std::enable_shared_from_this<Snapshotter>,
25 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
30 static constexpr auto max_pending_snapshots_count = 5;
36 std::shared_ptr<ccf::kv::Store> store;
39 size_t snapshot_tx_interval = max_tx_interval;
45 std::string commit_evidence;
47 std::vector<uint8_t> serialised_snapshot;
51 bool is_stored =
false;
53 std::optional<::consensus::Index> evidence_idx = std::nullopt;
55 std::optional<NodeId> node_id = std::nullopt;
56 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
57 std::optional<std::vector<uint8_t>> sig = std::nullopt;
58 std::optional<std::vector<uint8_t>> tree = std::nullopt;
60 SnapshotInfo() =
default;
64 std::map<uint32_t, SnapshotInfo> pending_snapshots;
67 static constexpr ::consensus::Index initial_snapshot_idx = 0;
73 bool snapshot_generation_enabled =
true;
83 std::deque<SnapshotEntry> next_snapshot_indices;
87 const std::vector<uint8_t>& serialised_receipt)
93 ::consensus::snapshot_commit,
101 std::shared_ptr<Snapshotter> self;
102 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
103 uint32_t generation_count;
108 msg->data.self->snapshot_(
109 std::move(msg->data.snapshot), msg->data.generation_count);
113 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
114 uint32_t generation_count)
116 auto snapshot_version = snapshot->get_version();
119 std::unique_lock<ccf::pal::Mutex> guard(lock);
120 if (pending_snapshots.size() >= max_pending_snapshots_count)
123 "Skipping new snapshot generation as {} snapshots are already "
125 pending_snapshots.size());
134 pending_snapshots[generation_count] = {};
135 pending_snapshots[generation_count].version = snapshot_version;
138 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
139 auto serialised_snapshot_size = serialised_snapshot.size();
141 auto tx = store->create_tx();
144 evidence->put({snapshot_hash, snapshot_version});
147 cd.
set(std::move(snapshot_hash));
150 std::string commit_evidence;
151 auto capture_ws_digest_and_commit_evidence =
152 [&ws_digest, &commit_evidence](
153 const std::vector<uint8_t>& write_set,
154 const std::string& commit_evidence_) {
157 commit_evidence = commit_evidence_;
160 auto rc = tx.commit(cd,
nullptr, capture_ws_digest_and_commit_evidence);
164 "Could not commit snapshot evidence for seqno {}: {}",
170 auto evidence_version = tx.commit_version();
173 std::unique_lock<ccf::pal::Mutex> guard(lock);
174 pending_snapshots[generation_count].commit_evidence = commit_evidence;
175 pending_snapshots[generation_count].write_set_digest = ws_digest;
176 pending_snapshots[generation_count].snapshot_digest = cd.
value();
177 pending_snapshots[generation_count].serialised_snapshot =
178 std::move(serialised_snapshot);
183 ::consensus::snapshot_allocate,
187 serialised_snapshot_size,
191 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
192 "seqno {}: {}, ws digest: {}",
193 serialised_snapshot_size,
202 while ((next_snapshot_indices.size() > 1) &&
203 (std::next(next_snapshot_indices.begin())->idx <= idx))
205 next_snapshot_indices.pop_front();
208 for (
auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
210 auto& snapshot_info = it->second;
213 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
214 idx > snapshot_info.evidence_idx.value())
216 auto serialised_receipt = build_and_serialise_receipt(
217 snapshot_info.sig.value(),
218 snapshot_info.tree.value(),
219 snapshot_info.node_id.value(),
220 snapshot_info.node_cert.value(),
221 snapshot_info.evidence_idx.value(),
222 snapshot_info.write_set_digest,
223 snapshot_info.commit_evidence,
224 std::move(snapshot_info.snapshot_digest));
226 commit_snapshot(snapshot_info.version, serialised_receipt);
227 it = pending_snapshots.erase(it);
239 std::shared_ptr<ccf::kv::Store>& store_,
240 size_t snapshot_tx_interval_) :
241 writer_factory(writer_factory_),
243 snapshot_tx_interval(snapshot_tx_interval_)
245 next_snapshot_indices.push_back({initial_snapshot_idx,
false,
true});
253 std::lock_guard<ccf::pal::Mutex> guard(lock);
255 last_snapshot_idx = next_snapshot_indices.back().idx;
260 std::lock_guard<ccf::pal::Mutex> guard(lock);
261 snapshot_generation_enabled = enabled;
267 std::lock_guard<ccf::pal::Mutex> guard(lock);
269 if (last_snapshot_idx != 0)
271 throw std::logic_error(
272 "Last snapshot index can only be set if no snapshot has been "
276 last_snapshot_idx = idx;
278 next_snapshot_indices.clear();
279 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
283 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
285 std::lock_guard<ccf::pal::Mutex> guard(lock);
287 auto search = pending_snapshots.find(generation_count);
288 if (search == pending_snapshots.end())
291 "Could not find pending snapshot to write for generation count {}",
296 auto& pending_snapshot = search->second;
297 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
303 "Host allocated snapshot buffer [{} bytes] is not of expected "
304 "size [{} bytes]. Discarding snapshot for seqno {}",
306 pending_snapshot.serialised_snapshot.size(),
307 pending_snapshot.version);
308 pending_snapshots.erase(search);
313 pending_snapshot.serialised_snapshot.begin(),
314 pending_snapshot.serialised_snapshot.end(),
315 snapshot_buf.begin());
316 pending_snapshot.is_stored =
true;
319 "Successfully copied snapshot at seqno {} to host memory [{} "
321 pending_snapshot.version,
322 pending_snapshot.serialised_snapshot.size());
330 std::lock_guard<ccf::pal::Mutex> guard(lock);
333 idx >= next_snapshot_indices.back().idx,
334 "Committable seqno {} < next snapshot seqno {}",
336 next_snapshot_indices.back().idx);
338 bool forced = store->flag_enabled_unsafe(
342 for (
auto it = next_snapshot_indices.rbegin();
343 it != next_snapshot_indices.rend();
348 last_unforced_idx = it->idx;
353 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
356 next_snapshot_indices.push_back({idx, !due,
false});
358 "{} {} as snapshot index", !due ?
"Forced" :
"Recorded", idx);
359 store->unset_flag_unsafe(
369 const std::vector<uint8_t>& sig,
373 std::lock_guard<ccf::pal::Mutex> guard(lock);
375 for (
auto& [_, pending_snapshot] : pending_snapshots)
378 pending_snapshot.evidence_idx.has_value() &&
379 idx > pending_snapshot.evidence_idx.value() &&
380 !pending_snapshot.sig.has_value())
383 "Recording signature at {} for snapshot {} with evidence at {}",
385 pending_snapshot.version,
386 pending_snapshot.evidence_idx.value());
388 pending_snapshot.node_id = node_id;
389 pending_snapshot.node_cert = node_cert;
390 pending_snapshot.sig = sig;
398 std::lock_guard<ccf::pal::Mutex> guard(lock);
400 for (
auto& [_, pending_snapshot] : pending_snapshots)
403 pending_snapshot.evidence_idx.has_value() &&
404 idx > pending_snapshot.evidence_idx.value() &&
405 !pending_snapshot.tree.has_value())
408 "Recording serialised tree at {} for snapshot {} with evidence at "
411 pending_snapshot.version,
412 pending_snapshot.evidence_idx.value());
414 pending_snapshot.tree = tree;
422 std::lock_guard<ccf::pal::Mutex> guard(lock);
424 for (
auto& [_, pending_snapshot] : pending_snapshots)
426 if (pending_snapshot.version == snapshot.
version)
429 "Recording evidence idx at {} for snapshot {}",
431 pending_snapshot.version);
433 pending_snapshot.evidence_idx = idx;
440 static uint32_t generation_count = 0;
441 auto msg = std::make_unique<::threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
442 msg->data.self = shared_from_this();
443 msg->data.snapshot = store->snapshot_unsafe_maps(idx);
444 msg->data.generation_count = generation_count++;
447 tm.add_task(tm.get_execution_thread(generation_count), std::move(msg));
458 std::lock_guard<ccf::pal::Mutex> guard(lock);
462 if (idx < last_snapshot_idx)
464 throw std::logic_error(fmt::format(
465 "Cannot snapshot at seqno {} which is earlier than last snapshot "
472 idx >= next_snapshot_indices.front().idx,
473 "Cannot commit snapshotter at {}, which is before last snapshottable "
476 next_snapshot_indices.front().idx);
478 auto& next = next_snapshot_indices.front();
479 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
480 if (due || (next.forced && !next.done))
482 if (snapshot_generation_enabled && generate_snapshot && next.idx)
488 if (due && !next.forced)
493 last_snapshot_idx = next.idx;
495 "Recorded {} as last snapshot index", last_snapshot_idx);
502 std::lock_guard<ccf::pal::Mutex> guard(lock);
504 while (!next_snapshot_indices.empty() &&
505 (next_snapshot_indices.back().idx > idx))
507 next_snapshot_indices.pop_back();
510 if (next_snapshot_indices.empty())
512 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
516 "Rolled back snapshotter: last snapshottable idx is now {}",
517 next_snapshot_indices.front().idx);
519 while (!pending_snapshots.empty())
521 const auto& last_snapshot = std::prev(pending_snapshots.end());
523 last_snapshot->second.evidence_idx.has_value() &&
524 idx >= last_snapshot->second.evidence_idx.value())
529 pending_snapshots.erase(last_snapshot);