17#include <unordered_set>
51 template <
typename... Args>
85 std::list<std::pair<Version, Write>> commit_deltas;
89 static State deserialize_map_snapshot(
90 std::span<const uint8_t> serialized_state)
93 const uint8_t* data = serialized_state.data();
94 size_t size = serialized_state.size();
99 size_t key_size = size;
100 K key = map::deserialize<K>(data, size);
105 size_t value_size = size;
106 VersionV value = map::deserialize<VersionV>(data, size);
113 if ((int64_t)value.
version >= 0)
115 map =
map.put(key, value);
137 change_set(change_set_)
143 return committed_writes || change_set.
has_writes();
148 auto& map_roll =
map.get_roll();
156 auto current = map_roll.commits->get_tail();
166 for (
auto it = change_set.
reads.begin(); it != change_set.
reads.end();
170 auto search = current->state.get(it->first);
172 if (std::get<0>(it->second) == NoVersion)
175 if (search.has_value())
187 !search.has_value() ||
188 std::get<0>(it->second) != search.value().version)
201 if (change_set.
writes.empty())
207 auto& map_roll =
map.get_roll();
208 auto state = map_roll.commits->get_tail()->state;
212 committed_writes =
true;
214 for (
auto it = change_set.
writes.begin(); it != change_set.
writes.end();
217 if (it->second.has_value())
221 state = state.put(it->first,
VersionV{v, v, it->second.
value()});
226 auto search = state.get(it->first);
227 if (search.has_value())
230 state = state.remove(it->first);
232 else if (track_deletes_on_missing_keys)
243 map.roll.commits->insert_back(
map.roll.create_new_local_commit(
244 v, std::move(state), change_set.
writes));
253 return map.trigger_map_hook(commit_version, change_set.
writes);
265 const std::string name;
269 std::unique_ptr<StateSnapshot> map_snapshot;
273 const std::string& name_,
276 std::unique_ptr<StateSnapshot>&& map_snapshot_) :
278 security_domain(security_domain_),
280 map_snapshot(
std::move(map_snapshot_))
289 std::vector<uint8_t> ret(map_snapshot->get_serialized_size());
290 map_snapshot->serialize(ret.data());
296 return security_domain;
308 const std::string& name_,
313 security_domain(security_domain_)
328 bool include_reads)
override
330 const auto non_abstract =
332 if (non_abstract ==
nullptr)
334 LOG_FAIL_FMT(
"Unable to serialise map due to type mismatch");
338 const auto& change_set = *non_abstract;
347 for (
auto it = change_set.reads.begin(); it != change_set.reads.end();
359 uint64_t write_ctr = 0;
360 uint64_t remove_ctr = 0;
361 for (
auto it = change_set.writes.begin(); it != change_set.writes.end();
364 if (it->second.has_value())
375 for (
auto it = change_set.writes.begin(); it != change_set.writes.end();
378 if (it->second.has_value())
385 for (
auto it = change_set.writes.begin(); it != change_set.writes.end();
388 if (!it->second.has_value())
405 change_set(change_set_)
422 (void)track_deletes_on_missing_keys;
426 map.roll.reset_commits();
427 map.roll.rollback_counter++;
429 auto r =
map.roll.commits->get_head();
431 r->state = change_set.
state;
432 r->version = change_set.
version;
436 if (
map.hook ||
map.global_hook)
438 r->state.foreach([&r](
const K& k,
const VersionV& v) {
439 r->writes[k] = v.
value;
447 auto r =
map.roll.commits->get_head();
448 return map.trigger_map_hook(change_set.
version, r->writes);
458 return std::make_unique<SnapshotChangeSet>(
459 deserialize_map_snapshot(map_snapshot), v);
464 return deserialise_internal(d, version);
470 auto change_set_ptr = create_change_set(version,
false);
471 if (change_set_ptr ==
nullptr)
474 "Failed to create change set over '{}' at {} - too early",
477 throw std::logic_error(
"Can't create change set");
480 auto& change_set = *change_set_ptr;
487 change_set.read_version = rv;
491 for (
size_t i = 0; i < ctr; ++i)
494 change_set.reads[std::get<0>(r)] =
495 std::make_tuple(std::get<1>(r), NoVersion);
499 for (
size_t i = 0; i < ctr; ++i)
502 change_set.writes[std::get<0>(w)] = std::get<1>(w);
506 for (
size_t i = 0; i < ctr; ++i)
509 change_set.writes[r] = std::nullopt;
512 return change_set_ptr;
518 auto non_abstract =
dynamic_cast<ChangeSet*
>(changes);
519 if (non_abstract ==
nullptr)
521 throw std::logic_error(
"Type confusion error");
525 if (snapshot_change_set !=
nullptr)
527 return std::make_unique<SnapshotHandleCommitter>(
528 *
this, *snapshot_change_set);
531 return std::make_unique<HandleCommitter>(*
this, *non_abstract);
566 global_hook =
nullptr;
575 return security_domain;
580 if (name != that.
name)
583 auto state1 = roll.
commits->get_tail();
584 auto state2 = that.roll.
commits->get_tail();
586 if (state1->version != state2->version)
590 state2->state.foreach([&count](
const K&,
const VersionV&) {
597 state1->state.foreach([&state2, &i](
const K& k,
const VersionV& v) {
598 auto search = state2->state.get(k);
600 if (search.has_value())
602 auto& found = search.value();
603 if (found.version != v.version)
607 else if (found.value != v.
value)
627#ifndef __cpp_impl_three_way_comparison
630 return !(*
this == that);
639 auto r = roll.
commits->get_head();
641 for (
auto current = roll.
commits->get_tail(); current !=
nullptr;
642 current = current->prev)
644 if (current->version <= v)
651 return std::make_unique<Snapshot>(
652 name, security_domain, r->version, r->state.make_snapshot());
663 auto r = roll.
commits->get_head();
671 commit_deltas.emplace_back(r->version, std::move(r->writes));
677 if (global_hook && !r->writes.empty())
679 commit_deltas.emplace_back(r->version, std::move(r->writes));
684 if (r->next->version > v)
692 auto r = roll.
commits->get_head();
694 if (global_hook && !r->writes.empty())
696 commit_deltas.emplace_back(r->version, std::move(r->writes));
704 for (
auto& [version, writes] : commit_deltas)
707 "Executing global hook on table {} at version {}",
710 global_hook(version, writes);
714 commit_deltas.clear();
721 bool advance =
false;
725 auto r = roll.
commits->get_tail();
733 auto c = roll.
commits->pop_tail();
763 throw std::logic_error(
764 "Attempted to swap maps with incompatible types");
766 std::swap(roll,
map->roll);
770 Version version,
bool track_deletes_on_missing_keys)
777 for (
auto current = roll.
commits->get_tail(); current !=
nullptr;
778 current = current->prev)
780 if (current->version <= version)
783 if (track_deletes_on_missing_keys)
785 writes = current->writes;
787 changes = std::make_unique<untyped::ChangeSet>(
790 roll.
commits->get_head()->state,
812 if (hook && !writes.empty())
815 "Executing local hook on table {} at version {}",
818 return hook(version, writes);
Definition kv_types.h:590
Definition kv_types.h:598
Definition kv_types.h:614
Definition kv_types.h:611
Definition kv_types.h:680
Definition generic_serialise_wrapper.h:239
std::tuple< SerialisedKey, Version > deserialise_read()
Definition generic_serialise_wrapper.h:416
std::vector< uint8_t > deserialise_raw()
Definition generic_serialise_wrapper.h:435
std::tuple< SerialisedKey, SerialisedValue > deserialise_write()
Definition generic_serialise_wrapper.h:428
uint64_t deserialise_write_header()
Definition generic_serialise_wrapper.h:423
uint64_t deserialise_remove_header()
Definition generic_serialise_wrapper.h:445
uint64_t deserialise_read_header()
Definition generic_serialise_wrapper.h:411
Version deserialise_entry_version()
Definition generic_serialise_wrapper.h:406
SerialisedKey deserialise_remove()
Definition generic_serialise_wrapper.h:450
Definition generic_serialise_wrapper.h:20
void serialise_count_header(uint64_t ctr)
Definition generic_serialise_wrapper.h:124
void serialise_remove(const SerialisedKey &k)
Definition generic_serialise_wrapper.h:141
void serialise_raw(const std::vector< uint8_t > &raw)
Definition generic_serialise_wrapper.h:108
void start_map(const std::string &name, SecurityDomain domain)
Definition generic_serialise_wrapper.h:92
void serialise_entry_version(const Version &version)
Definition generic_serialise_wrapper.h:119
void serialise_read(const SerialisedKey &k, const Version &version)
Definition generic_serialise_wrapper.h:129
void serialise_write(const SerialisedKey &k, const SerialisedValue &v)
Definition generic_serialise_wrapper.h:135
Definition untyped_map_diff.h:19
Definition untyped_map_handle.h:18
Definition untyped_map.h:123
void commit(Version v, bool track_deletes_on_missing_keys) override
Definition untyped_map.h:199
bool prepare() override
Definition untyped_map.h:146
Version commit_version
Definition untyped_map.h:129
bool changes
Definition untyped_map.h:131
bool committed_writes
Definition untyped_map.h:132
void set_commit_version(Version v)
Definition untyped_map.h:256
ConsensusHookPtr post_commit() override
Definition untyped_map.h:248
Map & map
Definition untyped_map.h:125
ChangeSet & change_set
Definition untyped_map.h:127
HandleCommitter(Map &m, ChangeSet &change_set_)
Definition untyped_map.h:135
bool has_writes() override
Definition untyped_map.h:141
Definition untyped_map.h:396
bool prepare() override
Definition untyped_map.h:413
SnapshotHandleCommitter(Map &m, SnapshotChangeSet &change_set_)
Definition untyped_map.h:403
bool has_writes() override
Definition untyped_map.h:408
void commit(Version v, bool track_deletes_on_missing_keys) override
Definition untyped_map.h:419
ConsensusHookPtr post_commit() override
Definition untyped_map.h:445
Definition untyped_map.h:263
Snapshot(const std::string &name_, SecurityDomain security_domain_, ccf::kv::Version version_, std::unique_ptr< StateSnapshot > &&map_snapshot_)
Definition untyped_map.h:272
void serialise(KvStoreSerialiser &s) override
Definition untyped_map.h:283
SecurityDomain get_security_domain() const override
Definition untyped_map.h:294
Definition untyped_map.h:69
Map(const Map &that)=delete
ChangeSetPtr deserialise_changes(KvStoreDeserialiser &d, Version version)
Definition untyped_map.h:462
void swap(AbstractMap *map_) override
Definition untyped_map.h:759
std::unique_ptr< AbstractCommitter > create_committer(AbstractChangeSet *changes) override
Definition untyped_map.h:515
void compact(Version v) override
Definition untyped_map.h:655
virtual SecurityDomain get_security_domain() override
Definition untyped_map.h:573
ConsensusHookPtr trigger_map_hook(Version version, const Write &writes)
Definition untyped_map.h:810
ChangeSetPtr deserialise_internal(KvStoreDeserialiser &d, Version version)
Definition untyped_map.h:467
bool operator==(const Map &that) const
Definition untyped_map.h:578
Map(AbstractStore *store_, const std::string &name_, SecurityDomain security_domain_)
Definition untyped_map.h:306
virtual AbstractMap * clone(AbstractStore *other) override
Definition untyped_map.h:320
void unlock() override
Definition untyped_map.h:754
ccf::kv::untyped::CommitHook CommitHook
Definition untyped_map.h:77
void clear() override
Definition untyped_map.h:741
void set_global_hook(const CommitHook &hook_)
Definition untyped_map.h:557
ccf::kv::untyped::SerialisedEntry V
Definition untyped_map.h:72
void serialise_changes(const AbstractChangeSet *changes, KvStoreSerialiser &s, bool include_reads) override
Definition untyped_map.h:325
ccf::kv::untyped::SerialisedKeyHasher H
Definition untyped_map.h:73
ChangeSetPtr create_change_set(Version version, bool track_deletes_on_missing_keys)
Definition untyped_map.h:769
ccf::kv::untyped::MapHook MapHook
Definition untyped_map.h:78
Roll & get_roll()
Definition untyped_map.h:805
bool operator!=(const Map &that) const
Definition untyped_map.h:628
ccf::kv::untyped::State::Snapshot StateSnapshot
Definition untyped_map.h:75
ccf::kv::untyped::SerialisedEntry K
Definition untyped_map.h:71
void unset_global_hook()
Definition untyped_map.h:564
ChangeSetPtr deserialise_snapshot_changes(KvStoreDeserialiser &d)
Definition untyped_map.h:452
void lock() override
Definition untyped_map.h:749
void set_map_hook(const MapHook &hook_)
Definition untyped_map.h:543
AbstractStore * get_store() override
Definition untyped_map.h:538
void unset_map_hook()
Definition untyped_map.h:548
void post_compact() override
Definition untyped_map.h:700
void rollback(Version v) override
Definition untyped_map.h:717
std::unique_ptr< AbstractMap::Snapshot > snapshot(Version v) override
Definition untyped_map.h:634
Snapshot< K, VersionV, H > Snapshot
Definition champ_map.h:403
void insert(T *item)
Definition dl_list.h:87
T * pop()
Definition dl_list.h:67
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
ccf::kv::CommitHook< Write > CommitHook
Definition untyped.h:18
ccf::ByteVector SerialisedEntry
Definition untyped_change_set.h:19
std::hash< SerialisedEntry > SerialisedKeyHasher
Definition untyped_change_set.h:20
std::map< ccf::kv::serialisers::SerialisedEntry, std::optional< ccf::kv::serialisers::SerialisedEntry > > Write
Definition untyped.h:16
std::unique_ptr< ChangeSet > ChangeSetPtr
Definition untyped_change_set.h:78
ccf::kv::MapHook< Write > MapHook
Definition untyped.h:19
champ::Map< K, VersionV, H > State
Definition untyped_change_set.h:29
SecurityDomain
Definition kv_types.h:255
std::unique_ptr< ConsensusHook > ConsensusHookPtr
Definition hooks.h:21
uint64_t Version
Definition version.h:8
std::mutex Mutex
Definition locking.h:12
Definition map_serializers.h:11
void skip(const uint8_t *&data, size_t &size, size_t skip)
Definition serialized.h:166
std::string name
Definition get_name.h:12
Definition version_v.h:11
V value
Definition version_v.h:14
Version version
Definition version_v.h:12
Definition untyped_change_set.h:43
ccf::kv::untyped::Read reads
Definition untyped_change_set.h:54
const size_t rollback_counter
Definition untyped_change_set.h:48
bool has_writes() const override
Definition untyped_change_set.h:72
Version read_version
Definition untyped_change_set.h:53
const Version start_version
Definition untyped_change_set.h:51
ccf::kv::untyped::Write writes
Definition untyped_change_set.h:55
Definition untyped_map.h:22
State state
Definition untyped_map.h:31
LocalCommit * next
Definition untyped_map.h:33
LocalCommit(Version v, State &&s, const Write &w)
Definition untyped_map.h:24
LocalCommit * prev
Definition untyped_map.h:34
Version version
Definition untyped_map.h:30
Write writes
Definition untyped_map.h:32
Definition untyped_map.h:39
LocalCommits empty_commits
Definition untyped_map.h:43
LocalCommit * create_new_local_commit(Args &&... args)
Definition untyped_map.h:52
void reset_commits()
Definition untyped_map.h:45
size_t rollback_counter
Definition untyped_map.h:41
std::unique_ptr< LocalCommits > commits
Definition untyped_map.h:40
Definition untyped_change_set.h:83
const ccf::kv::untyped::State state
Definition untyped_change_set.h:84
const Version version
Definition untyped_change_set.h:85