17#define FMT_HEADER_ONLY
19#include <fmt/format.h>
32 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
60 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>,
bool>>
66 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
90 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
91 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
95 std::shared_ptr<Consensus>
consensus =
nullptr;
96 std::shared_ptr<TxHistory> history =
nullptr;
97 std::shared_ptr<ILedgerChunker> chunker =
nullptr;
106 const bool strict_versions =
true;
109 const bool is_historical =
false;
114 size_t size_since_chunk = 0;
115 size_t chunk_threshold = 0;
117 bool commit_deserialised(
123 bool track_deletes_on_missing_keys)
override
125 auto c = apply_changes(
127 [v](
bool) {
return std::make_tuple(v, v - 1); },
131 track_deletes_on_missing_keys);
134 LOG_FAIL_FMT(
"Failed to commit deserialised Tx at version {}", v);
146 bool has_map_internal(
const std::string& name)
148 auto search =
maps.find(name);
149 if (search !=
maps.end())
162 if (
version > std::numeric_limits<int64_t>::max())
174 TxID current_txid_unsafe()
181 Store(
bool strict_versions_ =
true,
bool is_historical_ =
false) :
182 strict_versions(strict_versions_),
183 is_historical(is_historical_)
198 std::atomic_store(&
consensus, consensus_);
223 encryptor = encryptor_;
233 snapshotter = snapshotter_;
252 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
265 auto search =
maps.find(map_name);
266 if (search !=
maps.end())
268 const auto& [map_creation_version, map_ptr] = search->second;
269 if (v >= map_creation_version || map_creation_version == NoVersion)
291 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
294 throw std::logic_error(fmt::format(
295 "Can't add dynamic map - {} is not of expected type",
299 const auto map_name =
map->get_name();
302 throw std::logic_error(fmt::format(
303 "Can't add dynamic map - already have a map named {}", map_name));
306 LOG_DEBUG_FMT(
"Adding newly created map '{}' at version {}", map_name, v);
307 maps[map_name] = std::make_pair(v,
map);
311 const auto global_it = global_hooks.find(map_name);
312 if (global_it != global_hooks.end())
314 map->set_global_hook(global_it->second);
317 const auto map_it = map_hooks.find(map_name);
318 if (map_it != map_hooks.end())
320 map->set_map_hook(map_it->second);
330 throw std::logic_error(fmt::format(
331 "Cannot snapshot at version {} which is earlier than last "
332 "compacted version {} ",
339 throw std::logic_error(fmt::format(
340 "Cannot snapshot at version {} which is later than current "
346 auto snapshot = std::make_unique<StoreSnapshot>(v);
349 for (
auto& it :
maps)
351 auto& [_,
map] = it.second;
352 snapshot->add_map_snapshot(
map->snapshot(v));
358 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
364 snapshot->add_view_history(c->get_view_history(v));
374 for (
auto& it :
maps)
376 auto& [_,
map] = it.second;
383 for (
auto& it :
maps)
385 auto& [_,
map] = it.second;
392 std::unique_ptr<AbstractSnapshot> snapshot)
override
395 return snapshot->serialise(e);
402 std::vector<Version>* view_history =
nullptr,
403 bool public_only =
false)
override
409 std::optional<ccf::kv::SecurityDomain>());
413 auto v_ = d.init(data, size, term, entry_flags, is_historical);
416 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
420 std::shared_ptr<TxHistory> h =
nullptr;
421 std::vector<uint8_t> hash_at_snapshot;
422 std::vector<Version> view_history_;
424 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
426 for (
auto& it :
maps)
428 auto& [_,
map] = it.second;
435 hash_at_snapshot = d.deserialise_raw();
440 view_history_ = d.deserialise_view_history();
446 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
448 const auto map_name = r.value();
450 std::shared_ptr<ccf::kv::untyped::Map>
map =
nullptr;
452 auto search =
maps.find(map_name);
453 if (search ==
maps.end())
455 map = std::make_shared<ccf::kv::untyped::Map>(
456 this, map_name, get_security_domain(map_name));
457 new_maps[map_name] =
map;
459 "Creating map {} while deserialising snapshot at version {}",
465 map = search->second.second;
468 auto changes_search = changes.find(map_name);
469 if (changes_search != changes.end())
471 LOG_FAIL_FMT(
"Failed to deserialise snapshot at version {}", v);
476 auto deserialised_snapshot_changes =
477 map->deserialise_snapshot_changes(d);
481 changes.emplace_hint(
483 std::piecewise_construct,
484 std::forward_as_tuple(map_name),
485 std::forward_as_tuple(
486 map, std::move(deserialised_snapshot_changes)));
489 for (
auto& it :
maps)
491 auto& [_,
map] = it.second;
497 LOG_FAIL_FMT(
"Unexpected content in snapshot at version {}", v);
504 bool track_deletes_on_missing_keys =
false;
505 auto r = apply_changes(
507 [](
bool) {
return std::make_tuple(NoVersion, NoVersion); },
512 track_deletes_on_missing_keys);
516 "Failed to commit deserialised snapshot at version {}", v);
529 if (!h->init_from_snapshot(hash_at_snapshot))
537 *view_history = std::move(view_history_);
552 bool generate_snapshot = c && c->is_primary();
553 snapshotter->commit(v, generate_snapshot);
558 chunker->compacted_to(v);
561 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
568 for (
auto& it :
maps)
570 auto& [_,
map] = it.second;
574 for (
auto& it :
maps)
576 auto& [_,
map] = it.second;
580 for (
auto& it :
maps)
582 auto& [_,
map] = it.second;
597 for (
auto& it :
maps)
599 auto& [_,
map] = it.second;
612 snapshotter->rollback(tx_id.
version);
617 chunker->rolled_back_to(tx_id.
version);
620 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
626 throw std::logic_error(fmt::format(
627 "Attempting rollback to {}, earlier than commit version {}",
662 for (
auto& it :
maps)
664 auto& [_,
map] = it.second;
668 auto it =
maps.begin();
669 while (it !=
maps.end())
671 auto& [map_creation_version,
map] = it->second;
675 if (map_creation_version > tx_id.
version)
688 for (
auto& map_it :
maps)
690 auto& [_,
map] = map_it.second;
702 throw std::logic_error(
"term_of_next_version is already initialised");
714 const std::vector<uint8_t>& data,
722 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
723 bool ignore_strict_versions =
false)
override
735 std::optional<ccf::kv::SecurityDomain>());
738 d.init(data.data(), data.size(),
view, entry_flags, is_historical);
741 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
746 claims_digest = std::move(d.consume_claims_digest());
748 "Deserialised claim digest {} {}",
749 claims_digest.
value(),
750 claims_digest.
empty());
752 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
753 if (commit_evidence_digest.has_value())
755 "Deserialised commit evidence digest {}",
756 commit_evidence_digest.value());
762 if (strict_versions && !ignore_strict_versions)
769 "Tried to deserialise {} but current_version is {}", v, cv);
778 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
780 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
782 const auto map_name = r.value();
787 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
788 this, map_name, get_security_domain(map_name));
790 new_maps[map_name] = new_map;
792 "Creating map '{}' while deserialising transaction at version {}",
797 auto change_search = changes.find(map_name);
798 if (change_search != changes.end())
800 LOG_FAIL_FMT(
"Failed to deserialise transaction at version {}", v);
805 auto deserialised_changes =
map->deserialise_changes(d, v);
809 changes.emplace_hint(
811 std::piecewise_construct,
812 std::forward_as_tuple(map_name),
813 std::forward_as_tuple(
map, std::move(deserialised_changes)));
818 LOG_FAIL_FMT(
"Unexpected content in transaction at version {}", v);
826 const std::vector<uint8_t>& data,
827 bool public_only =
false,
828 const std::optional<TxID>& expected_txid = std::nullopt)
override
830 auto exec = std::make_unique<CFTExecutionWrapper>(
846 if (
maps.size() != that.
maps.size())
849 for (
auto it =
maps.begin(); it !=
maps.end(); ++it)
851 auto search = that.
maps.find(it->first);
853 if (search == that.
maps.end())
856 auto& [this_v, this_map] = it->second;
857 auto& [that_v, that_map] = search->second;
859 if (this_v != that_v)
862 if (*this_map != *that_map)
878 return current_txid_unsafe();
884 return {kv_id.term, kv_id.version};
907 std::unique_ptr<PendingTx> pending_tx,
908 bool globally_committable)
override
916 std::lock_guard<ccf::pal::Mutex> cguard(
commit_lock);
919 "Store::commit {}{}",
921 (globally_committable ?
" globally_committable" :
""));
924 Version previous_last_replicated = 0;
925 Version next_last_replicated = 0;
926 Version previous_rollback_count = 0;
929 std::vector<std::tuple<std::unique_ptr<PendingTx>,
bool>>
930 contiguous_pending_txs;
940 "Want to commit for term {} but term is {}",
954 std::make_tuple(std::move(pending_tx), globally_committable)});
958 for (
Version offset = 1;
true; ++offset)
964 "Couldn't find {} = {} + {}, giving up on batch while committing "
974 contiguous_pending_txs.emplace_back(std::move(search->second));
980 next_last_replicated =
last_replicated + contiguous_pending_txs.size();
986 if (contiguous_pending_txs.size() == 0)
992 for (
auto& [pending_tx_, committable_] : contiguous_pending_txs)
995 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
998 std::make_shared<std::vector<uint8_t>>(std::move(data_));
1000 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
1009 "Failed Tx commit {}", previous_last_replicated + offset);
1016 *data_shared, commit_evidence_digest_, claims_digest_),
1022 chunker->append_entry_size(data_shared->size());
1026 "Batching {} ({}) during commit of {}.{}",
1027 previous_last_replicated + offset,
1028 data_shared->size(),
1033 previous_last_replicated + offset,
1041 if (c->replicate(batch, replication_view))
1074 r |= chunker->is_chunk_end_requested(
version);
1079 r |= snapshotter->record_committable(
version);
1104 Version v = next_version_unsafe();
1112 return std::make_tuple(v, previous_last_new_map);
1118 return next_version_unsafe();
1124 next_version_unsafe();
1152 if (source_version > target_version)
1154 throw std::runtime_error(fmt::format(
1155 "Invalid call to swap_private_maps. Source is at version {} while "
1162 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1166 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1167 std::vector<MapEntry> entries;
1170 for (
auto& [name, pair] : store.
maps)
1172 auto& [_,
map] = pair;
1176 entries.emplace_back(name,
nullptr,
map.get());
1182 auto entry = entries.begin();
1183 while (entry != entries.end())
1185 const auto& [name, _, their_map] = *entry;
1186 std::shared_ptr<AbstractMap>
map =
nullptr;
1187 const auto it =
maps.find(name);
1188 if (it ==
maps.end())
1193 auto new_map = std::make_pair(
1195 std::make_shared<ccf::kv::untyped::Map>(
1197 maps[name] = new_map;
1198 map = new_map.second;
1202 map = it->second.second;
1205 throw std::logic_error(fmt::format(
1206 "Swap mismatch - map {} is private in source but not in target",
1211 std::get<1>(*entry) =
map.get();
1216 for (
auto& [name, lhs, rhs] : entries)
1221 for (
auto& [name, lhs, rhs] : entries)
1231 map_hooks[map_name] = hook;
1233 const auto it =
maps.find(map_name);
1234 if (it !=
maps.end())
1236 it->second.second->set_map_hook(hook);
1242 map_hooks.erase(map_name);
1244 const auto it =
maps.find(map_name);
1245 if (it !=
maps.end())
1247 it->second.second->unset_map_hook();
1252 const std::string& map_name,
1255 global_hooks[map_name] = hook;
1257 const auto it =
maps.find(map_name);
1258 if (it !=
maps.end())
1260 it->second.second->set_global_hook(hook);
1266 global_hooks.erase(map_name);
1268 const auto it =
maps.find(map_name);
1269 if (it !=
maps.end())
1271 it->second.second->unset_global_hook();
1282 return std::make_unique<ReadOnlyTx>(
this);
1297 return std::make_unique<CommittableTx>(
this);
1326 this->flags |=
static_cast<uint8_t
>(f);
1331 this->flags &= ~static_cast<uint8_t>(f);
1336 return (flags &
static_cast<uint8_t
>(f)) != 0;
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:680
StoreFlag
Definition kv_types.h:749
@ SNAPSHOT_AT_NEXT_SIGNATURE
Definition committable_tx.h:18
Definition deserialise.h:18
Definition read_only_store.h:13
Definition committable_tx.h:394
Term term_of_last_version
Definition store.h:51
std::atomic< Version > compacted
Definition store.h:39
std::atomic< Version > version
Definition store.h:37
ccf::pal::Mutex maps_lock
Definition store.h:33
Version last_committable
Definition store.h:56
Version rollback_count
Definition store.h:58
Version last_new_map
Definition store.h:38
Maps maps
Definition store.h:34
std::atomic< Term > term_of_next_version
Definition store.h:45
void clear()
Definition store.h:64
ccf::pal::Mutex version_lock
Definition store.h:36
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:32
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:61
Version last_replicated
Definition store.h:53
ccf::pal::Mutex commit_lock
Definition store.h:42
virtual void unset_flag(StoreFlag f) override
Definition store.h:1312
virtual bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1334
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1065
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:398
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1300
ReadOnlyTx create_read_only_tx() override
Definition store.h:1275
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:256
size_t committable_gap() override
Definition store.h:1129
TxID next_txid() override
Definition store.h:1121
void unset_global_hook(const std::string &map_name)
Definition store.h:1264
void initialise_term(Term t) override
Definition store.h:695
Store(const Store &that)=delete
std::shared_ptr< TxHistory > get_history() override
Definition store.h:201
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1228
void unlock_map_set() override
Definition store.h:1090
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:231
void unset_map_hook(const std::string &map_name)
Definition store.h:1240
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:288
Version current_version() override
Definition store.h:869
void swap_private_maps(Store &store)
Definition store.h:1147
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:249
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1101
bool operator==(const Store &that) const
Definition store.h:840
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:211
EncryptorPtr get_encryptor() override
Definition store.h:226
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:391
bool check_rollback_count(Version count) override
Definition store.h:1095
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1295
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:196
ccf::kv::TxID current_txid() override
Definition store.h:874
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:604
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:325
void lock_map_set() override
Definition store.h:1085
Term commit_view() override
Definition store.h:899
void unlock_maps() override
Definition store.h:381
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1280
void lock_maps() override
Definition store.h:371
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:221
CommittableTx create_tx()
Definition store.h:1290
virtual bool flag_enabled(StoreFlag f) override
Definition store.h:1318
bool should_create_ledger_chunk(Version version) override
Definition store.h:1059
Version next_version() override
Definition store.h:1115
Version compacted_version() override
Definition store.h:894
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:181
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1251
virtual void set_flag_unsafe(StoreFlag f) override
Definition store.h:1324
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:887
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:188
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:825
virtual void set_flag(StoreFlag f) override
Definition store.h:1306
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:216
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:206
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:905
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:713
ccf::TxID get_txid() override
Definition store.h:881
void compact(Version v) override
Definition store.h:543
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:262
virtual void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1329
TxDiff create_tx_diff() override
Definition store.h:1285
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition app_interface.h:19
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:576
uint64_t Term
Definition kv_types.h:48
@ PRIVATE
Definition kv_types.h:257
@ PUBLIC
Definition kv_types.h:256
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:245
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:248
@ FAIL_NO_REPLICATE
Definition kv_types.h:251
@ SUCCESS
Definition kv_types.h:249
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:587
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
ApplyResult
Definition kv_types.h:341
@ FAIL
Definition kv_types.h:350
@ PASS
Definition kv_types.h:342
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1340
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Version version
Definition kv_types.h:54
Term term
Definition kv_types.h:53