18#define FMT_HEADER_ONLY
20#include <fmt/format.h>
33 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
61 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>,
bool>>
67 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
91 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
92 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
96 std::shared_ptr<Consensus>
consensus =
nullptr;
97 std::shared_ptr<TxHistory> history =
nullptr;
98 std::shared_ptr<ILedgerChunker> chunker =
nullptr;
107 const bool strict_versions =
true;
110 const bool is_historical =
false;
115 bool commit_deserialised(
121 bool track_deletes_on_missing_keys)
override
123 auto c = apply_changes(
125 [v](
bool) {
return std::make_tuple(v, v - 1); },
129 track_deletes_on_missing_keys);
132 LOG_FAIL_FMT(
"Failed to commit deserialised Tx at version {}", v);
144 bool has_map_internal(
const std::string& name)
146 return maps.contains(name);
156 if (
version > std::numeric_limits<int64_t>::max())
168 TxID current_txid_unsafe()
175 Store(
bool strict_versions_ =
true,
bool is_historical_ =
false) :
176 strict_versions(strict_versions_),
177 is_historical(is_historical_)
192 std::atomic_store(&
consensus, consensus_);
217 encryptor = encryptor_;
227 snapshotter = snapshotter_;
246 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
259 auto search =
maps.find(map_name);
260 if (search !=
maps.end())
262 const auto& [map_creation_version, map_ptr] = search->second;
263 if (v >= map_creation_version || map_creation_version == NoVersion)
285 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
288 throw std::logic_error(fmt::format(
289 "Can't add dynamic map - {} is not of expected type",
293 const auto map_name =
map->get_name();
296 throw std::logic_error(fmt::format(
297 "Can't add dynamic map - already have a map named {}", map_name));
300 LOG_DEBUG_FMT(
"Adding newly created map '{}' at version {}", map_name, v);
301 maps[map_name] = std::make_pair(v,
map);
305 const auto global_it = global_hooks.find(map_name);
306 if (global_it != global_hooks.end())
308 map->set_global_hook(global_it->second);
311 const auto map_it = map_hooks.find(map_name);
312 if (map_it != map_hooks.end())
314 map->set_map_hook(map_it->second);
324 throw std::logic_error(fmt::format(
325 "Cannot snapshot at version {} which is earlier than last "
326 "compacted version {} ",
333 throw std::logic_error(fmt::format(
334 "Cannot snapshot at version {} which is later than current "
340 auto snapshot = std::make_unique<StoreSnapshot>(v);
343 for (
auto& it :
maps)
345 auto& [_,
map] = it.second;
346 snapshot->add_map_snapshot(
map->snapshot(v));
352 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
358 snapshot->add_view_history(c->get_view_history(v));
368 for (
auto& it :
maps)
370 auto& [_,
map] = it.second;
377 for (
auto& it :
maps)
379 auto& [_,
map] = it.second;
386 std::unique_ptr<AbstractSnapshot> snapshot)
override
389 return snapshot->serialise(e);
396 std::vector<Version>* view_history =
nullptr,
397 bool public_only =
false)
override
403 std::optional<ccf::kv::SecurityDomain>());
407 auto v_ = d.init(data, size, term, entry_flags, is_historical);
410 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
414 std::shared_ptr<TxHistory> h =
nullptr;
415 std::vector<uint8_t> hash_at_snapshot;
416 std::vector<Version> view_history_;
418 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
420 for (
auto& it :
maps)
422 auto& [_,
map] = it.second;
429 hash_at_snapshot = d.deserialise_raw();
432 if (view_history !=
nullptr)
434 view_history_ = d.deserialise_view_history();
440 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
442 const auto map_name = r.value();
444 std::shared_ptr<ccf::kv::untyped::Map>
map =
nullptr;
446 auto search =
maps.find(map_name);
447 if (search ==
maps.end())
449 map = std::make_shared<ccf::kv::untyped::Map>(
450 this, map_name, get_security_domain(map_name));
451 new_maps[map_name] =
map;
453 "Creating map {} while deserialising snapshot at version {}",
459 map = search->second.second;
462 auto changes_search = changes.find(map_name);
463 if (changes_search != changes.end())
465 LOG_FAIL_FMT(
"Failed to deserialise snapshot at version {}", v);
470 auto deserialised_snapshot_changes =
471 map->deserialise_snapshot_changes(d);
475 changes.emplace_hint(
477 std::piecewise_construct,
478 std::forward_as_tuple(map_name),
479 std::forward_as_tuple(
480 map, std::move(deserialised_snapshot_changes)));
483 for (
auto& it :
maps)
485 auto& [_,
map] = it.second;
491 LOG_FAIL_FMT(
"Unexpected content in snapshot at version {}", v);
498 bool track_deletes_on_missing_keys =
false;
499 auto r = apply_changes(
501 [](
bool) {
return std::make_tuple(NoVersion, NoVersion); },
506 track_deletes_on_missing_keys);
510 "Failed to commit deserialised snapshot at version {}", v);
523 if (!h->init_from_snapshot(hash_at_snapshot))
529 if (view_history !=
nullptr)
531 *view_history = std::move(view_history_);
546 bool generate_snapshot = c && c->is_primary();
547 snapshotter->commit(v, generate_snapshot);
552 chunker->compacted_to(v);
555 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
562 for (
auto& it :
maps)
564 auto& [_,
map] = it.second;
568 for (
auto& it :
maps)
570 auto& [_,
map] = it.second;
574 for (
auto& it :
maps)
576 auto& [_,
map] = it.second;
591 for (
auto& it :
maps)
593 auto& [_,
map] = it.second;
606 snapshotter->rollback(tx_id.
seqno);
611 chunker->rolled_back_to(tx_id.
seqno);
614 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
620 throw std::logic_error(fmt::format(
621 "Attempting rollback to {}, earlier than commit version {}",
652 e->rollback(tx_id.
seqno);
656 for (
auto& it :
maps)
658 auto& [_,
map] = it.second;
662 auto it =
maps.begin();
663 while (it !=
maps.end())
665 auto& [map_creation_version,
map] = it->second;
669 if (map_creation_version > tx_id.
seqno)
682 for (
auto& map_it :
maps)
684 auto& [_,
map] = map_it.second;
696 throw std::logic_error(
"term_of_next_version is already initialised");
708 const std::vector<uint8_t>& data,
716 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
717 bool ignore_strict_versions =
false)
override
729 std::optional<ccf::kv::SecurityDomain>());
732 d.init(data.data(), data.size(),
view, entry_flags, is_historical);
735 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
740 claims_digest = std::move(d.consume_claims_digest());
742 "Deserialised claim digest {} {}",
743 claims_digest.
value(),
744 claims_digest.
empty());
746 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
747 if (commit_evidence_digest.has_value())
750 "Deserialised commit evidence digest {}",
751 commit_evidence_digest.value());
758 if (strict_versions && !ignore_strict_versions)
765 "Tried to deserialise {} but current_version is {}", v, cv);
774 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
776 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
778 const auto map_name = r.value();
783 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
784 this, map_name, get_security_domain(map_name));
786 new_maps[map_name] = new_map;
788 "Creating map '{}' while deserialising transaction at version {}",
793 auto change_search = changes.find(map_name);
794 if (change_search != changes.end())
796 LOG_FAIL_FMT(
"Failed to deserialise transaction at version {}", v);
801 auto deserialised_changes =
map->deserialise_changes(d, v);
805 changes.emplace_hint(
807 std::piecewise_construct,
808 std::forward_as_tuple(map_name),
809 std::forward_as_tuple(
map, std::move(deserialised_changes)));
814 LOG_FAIL_FMT(
"Unexpected content in transaction at version {}", v);
822 const std::vector<uint8_t>& data,
823 bool public_only =
false,
824 const std::optional<TxID>& expected_txid = std::nullopt)
override
826 auto exec = std::make_unique<CFTExecutionWrapper>(
839 if (
maps.size() != that.
maps.size())
844 return std::ranges::all_of(
maps, [&that](
const auto& entry) {
845 const auto& [map_name, map_pair] = entry;
846 auto search = that.
maps.find(map_name);
848 if (search == that.
maps.end())
853 const auto& [this_v, this_map] = map_pair;
854 const auto& [that_v, that_map] = search->second;
856 if (this_v != that_v)
861 if (*this_map != *that_map)
878 return current_txid_unsafe();
901 std::unique_ptr<PendingTx> pending_tx,
902 bool globally_committable)
override
910 std::lock_guard<ccf::pal::Mutex> cguard(
commit_lock);
913 "Store::commit {}{}",
915 (globally_committable ?
" globally_committable" :
""));
918 Version previous_last_replicated = 0;
919 Version next_last_replicated = 0;
920 Version previous_rollback_count = 0;
923 std::vector<std::tuple<std::unique_ptr<PendingTx>,
bool>>
924 contiguous_pending_txs;
934 "Want to commit for term {} but term is {}",
948 std::make_tuple(std::move(pending_tx), globally_committable)});
952 for (
Version offset = 1;
true; ++offset)
958 "Couldn't find {} = {} + {}, giving up on batch while committing "
968 contiguous_pending_txs.emplace_back(std::move(search->second));
974 next_last_replicated =
last_replicated + contiguous_pending_txs.size();
980 if (contiguous_pending_txs.empty())
986 for (
auto& [pending_tx_, committable_] : contiguous_pending_txs)
989 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
992 std::make_shared<std::vector<uint8_t>>(std::move(data_));
994 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
1003 "Failed Tx commit {}", previous_last_replicated + offset);
1010 *data_shared, commit_evidence_digest_, claims_digest_),
1016 chunker->append_entry_size(data_shared->size());
1020 "Batching {} ({}) during commit of {}.{}",
1021 previous_last_replicated + offset,
1022 data_shared->size(),
1027 previous_last_replicated + offset,
1035 if (c->replicate(batch, replication_view))
1066 r |= chunker->is_chunk_end_requested(
version);
1071 r |= snapshotter->record_committable(
version);
1096 Version v = next_version_unsafe();
1104 return std::make_tuple(v, previous_last_new_map);
1110 return next_version_unsafe();
1116 next_version_unsafe();
1144 if (source_version > target_version)
1146 throw std::runtime_error(fmt::format(
1147 "Invalid call to swap_private_maps. Source is at version {} while "
1154 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1158 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1159 std::vector<MapEntry> entries;
1162 for (
auto& [name, pair] : store.
maps)
1164 auto& [_,
map] = pair;
1168 entries.emplace_back(name,
nullptr,
map.get());
1174 auto entry = entries.begin();
1175 while (entry != entries.end())
1177 const auto& [name, _, their_map] = *entry;
1178 std::shared_ptr<AbstractMap>
map =
nullptr;
1179 const auto it =
maps.find(name);
1180 if (it ==
maps.end())
1185 auto new_map = std::make_pair(
1187 std::make_shared<ccf::kv::untyped::Map>(
1189 maps[name] = new_map;
1190 map = new_map.second;
1194 map = it->second.second;
1197 throw std::logic_error(fmt::format(
1198 "Swap mismatch - map {} is private in source but not in target",
1203 std::get<1>(*entry) =
map.get();
1208 for (
auto& [name, lhs, rhs] : entries)
1213 for (
auto& [name, lhs, rhs] : entries)
1223 map_hooks[map_name] = hook;
1225 const auto it =
maps.find(map_name);
1226 if (it !=
maps.end())
1228 it->second.second->set_map_hook(hook);
1234 map_hooks.erase(map_name);
1236 const auto it =
maps.find(map_name);
1237 if (it !=
maps.end())
1239 it->second.second->unset_map_hook();
1244 const std::string& map_name,
1247 global_hooks[map_name] = hook;
1249 const auto it =
maps.find(map_name);
1250 if (it !=
maps.end())
1252 it->second.second->set_global_hook(hook);
1258 global_hooks.erase(map_name);
1260 const auto it =
maps.find(map_name);
1261 if (it !=
maps.end())
1263 it->second.second->unset_global_hook();
1274 return std::make_unique<ReadOnlyTx>(
this);
1289 return std::make_unique<CommittableTx>(
this);
1318 this->flags |=
static_cast<uint8_t
>(f);
1323 this->flags &= ~static_cast<uint8_t>(f);
1328 return (flags &
static_cast<uint8_t
>(f)) != 0;
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:610
StoreFlag
Definition kv_types.h:679
@ SNAPSHOT_AT_NEXT_SIGNATURE
Definition committable_tx.h:19
Definition deserialise.h:19
Definition read_only_store.h:13
Definition committable_tx.h:372
Term term_of_last_version
Definition store.h:52
std::atomic< Version > compacted
Definition store.h:40
std::atomic< Version > version
Definition store.h:38
ccf::pal::Mutex maps_lock
Definition store.h:34
Version last_committable
Definition store.h:57
Version rollback_count
Definition store.h:59
Version last_new_map
Definition store.h:39
Maps maps
Definition store.h:35
std::atomic< Term > term_of_next_version
Definition store.h:46
void clear()
Definition store.h:65
ccf::pal::Mutex version_lock
Definition store.h:37
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:33
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:62
Version last_replicated
Definition store.h:54
ccf::pal::Mutex commit_lock
Definition store.h:43
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1057
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:392
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1292
ReadOnlyTx create_read_only_tx() override
Definition store.h:1267
void set_flag_unsafe(StoreFlag f) override
Definition store.h:1316
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:250
size_t committable_gap() override
Definition store.h:1121
TxID next_txid() override
Definition store.h:1113
void unset_global_hook(const std::string &map_name)
Definition store.h:1256
void initialise_term(Term t) override
Definition store.h:689
Store(const Store &that)=delete
void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1321
std::shared_ptr< TxHistory > get_history() override
Definition store.h:195
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1220
void unlock_map_set() override
Definition store.h:1082
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:225
void unset_map_hook(const std::string &map_name)
Definition store.h:1232
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:282
Version current_version() override
Definition store.h:869
void swap_private_maps(Store &store)
Definition store.h:1139
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:243
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1093
bool flag_enabled(StoreFlag f) override
Definition store.h:1310
bool operator==(const Store &that) const
Definition store.h:831
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:205
EncryptorPtr get_encryptor() override
Definition store.h:220
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:385
bool check_rollback_count(Version count) override
Definition store.h:1087
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1287
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:190
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:598
void unset_flag(StoreFlag f) override
Definition store.h:1304
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:319
void lock_map_set() override
Definition store.h:1077
Term commit_view() override
Definition store.h:893
void unlock_maps() override
Definition store.h:375
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1272
void lock_maps() override
Definition store.h:365
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:215
CommittableTx create_tx()
Definition store.h:1282
bool should_create_ledger_chunk(Version version) override
Definition store.h:1051
Version next_version() override
Definition store.h:1107
Version compacted_version() override
Definition store.h:888
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:175
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1243
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:881
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:182
void set_flag(StoreFlag f) override
Definition store.h:1298
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:821
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:210
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:200
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:899
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:707
ccf::TxID current_txid() override
Definition store.h:874
void compact(Version v) override
Definition store.h:537
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:256
TxDiff create_tx_diff() override
Definition store.h:1277
bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1326
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:19
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:506
uint64_t Term
Definition kv_types.h:43
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:206
CommitResult
Definition kv_types.h:209
@ FAIL_NO_REPLICATE
Definition kv_types.h:212
@ SUCCESS
Definition kv_types.h:210
@ PRIVATE
Definition kv_types.h:218
@ PUBLIC
Definition kv_types.h:217
uint64_t Version
Definition version.h:8
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:517
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1332
ApplyResult
Definition kv_types.h:302
@ FAIL
Definition kv_types.h:311
@ PASS
Definition kv_types.h:303
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45