CCF
Loading...
Searching...
No Matches
store.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
8#include "ccf/pal/locking.h"
9#include "deserialise.h"
10#include "kv/committable_tx.h"
12#include "kv/snapshot.h"
13#include "kv/untyped_map.h"
14#include "kv_serialiser.h"
15#include "kv_types.h"
16
17#define FMT_HEADER_ONLY
18#include <atomic>
19#include <fmt/format.h>
20#include <memory>
21
22namespace ccf::kv
23{
25 {
26 protected:
27 // All collections of Map must be ordered so that we lock their contained
28 // maps in a stable order. The order here is by map name. The version
29 // indicates the version at which the Map was created.
30 using Maps = std::map<
31 std::string,
32 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
35
37 std::atomic<Version> version = 0;
38 Version last_new_map = ccf::kv::NoVersion;
39 std::atomic<Version> compacted = 0;
40
41 // Calls to Store::commit are made atomic by taking this lock.
43
44 // Term at which write future transactions should be committed.
45 std::atomic<Term> term_of_next_version = 0;
46
47 // Term at which the last entry was committed. Further transactions
48 // should read in that term. Note that it is assumed that the history of
49 // terms of past transactions is kept track of by and specified by the
50 // caller on rollback
52
54 // Version of the latest committable entry committed in this term and by
55 // _this_ store.
57
59
60 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>, bool>>
62
63 public:
64 void clear()
65 {
66 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
68
69 maps.clear();
70 pending_txs.clear();
71
72 version = 0;
73 last_new_map = ccf::kv::NoVersion;
74 compacted = 0;
77
81 }
82 };
83
84 class Store : public AbstractStore,
85 public StoreState,
87 public ReadOnlyStore
88 {
89 private:
90 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
91 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
92 Hooks global_hooks;
93 MapHooks map_hooks;
94
95 std::shared_ptr<Consensus> consensus = nullptr;
96 std::shared_ptr<TxHistory> history = nullptr;
97 std::shared_ptr<ILedgerChunker> chunker = nullptr;
98 EncryptorPtr encryptor = nullptr;
99 SnapshotterPtr snapshotter = nullptr;
100
101 // Generally we will only accept deserialised views if they are contiguous -
102 // at Version N we reject everything but N+1. The exception is when a Store
103 // is used for historical queries, where it may deserialise arbitrary
104 // transactions. In this case the Store is a useful container for a set of
105 // Tables, but its versioning invariants are ignored.
106 const bool strict_versions = true;
107
108 // If true, use historical ledger secrets to deserialise entries
109 const bool is_historical = false;
110
111 // Ledger entry header flags
112 uint8_t flags = 0;
113
114 size_t size_since_chunk = 0;
115 size_t chunk_threshold = 0;
116
117 bool commit_deserialised(
118 OrderedChanges& changes,
119 Version v,
120 Term term,
121 const MapCollection& new_maps,
123 bool track_deletes_on_missing_keys) override
124 {
125 auto c = apply_changes(
126 changes,
127 [v](bool) { return std::make_tuple(v, v - 1); },
128 hooks,
129 new_maps,
130 std::nullopt,
131 track_deletes_on_missing_keys);
132 if (!c.has_value())
133 {
134 LOG_FAIL_FMT("Failed to commit deserialised Tx at version {}", v);
135 return false;
136 }
137 {
138 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
139 version = v;
142 }
143 return true;
144 }
145
146 bool has_map_internal(const std::string& name)
147 {
148 auto search = maps.find(name);
149 if (search != maps.end())
150 return true;
151
152 return false;
153 }
154
155 Version next_version_unsafe()
156 {
157 // Get the next global version
158 ++version;
159
160 // Version was previously signed, with negative values representing
161 // deletions. Maintain this restriction for compatibility with old code.
162 if (version > std::numeric_limits<int64_t>::max())
163 {
164 LOG_FAIL_FMT("KV version too large - wrapping to 0");
165 version = 0;
166 }
167
168 // Further transactions should read in the commit term
170
171 return version;
172 }
173
174 TxID current_txid_unsafe()
175 {
176 // version_lock should be first acquired
178 }
179
180 public:
181 Store(bool strict_versions_ = true, bool is_historical_ = false) :
182 strict_versions(strict_versions_),
183 is_historical(is_historical_)
184 {}
185
186 Store(const Store& that) = delete;
187
188 std::shared_ptr<Consensus> get_consensus() override
189 {
190 // We need to use std::atomic_load<std::shared_ptr<T>>
191 // after clang supports it.
192 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
193 return std::atomic_load(&consensus);
194 }
195
196 void set_consensus(const std::shared_ptr<Consensus>& consensus_)
197 {
198 std::atomic_store(&consensus, consensus_);
199 }
200
201 std::shared_ptr<TxHistory> get_history() override
202 {
203 return history;
204 }
205
206 void set_history(const std::shared_ptr<TxHistory>& history_)
207 {
208 history = history_;
209 }
210
211 std::shared_ptr<ILedgerChunker> get_chunker() override
212 {
213 return chunker;
214 }
215
216 void set_chunker(const std::shared_ptr<ILedgerChunker>& chunker_)
217 {
218 chunker = chunker_;
219 }
220
221 void set_encryptor(const EncryptorPtr& encryptor_)
222 {
223 encryptor = encryptor_;
224 }
225
227 {
228 return encryptor;
229 }
230
231 void set_snapshotter(const SnapshotterPtr& snapshotter_)
232 {
233 snapshotter = snapshotter_;
234 }
235
249 std::shared_ptr<AbstractMap> get_map(
250 ccf::kv::Version v, const std::string& map_name) override
251 {
252 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
253 return get_map_internal(v, map_name);
254 }
255
256 std::shared_ptr<AbstractMap> get_map_unsafe(
257 ccf::kv::Version v, const std::string& map_name) override
258 {
259 return get_map_internal(v, map_name);
260 }
261
262 std::shared_ptr<ccf::kv::untyped::Map> get_map_internal(
263 ccf::kv::Version v, const std::string& map_name)
264 {
265 auto search = maps.find(map_name);
266 if (search != maps.end())
267 {
268 const auto& [map_creation_version, map_ptr] = search->second;
269 if (v >= map_creation_version || map_creation_version == NoVersion)
270 {
271 return map_ptr;
272 }
273 }
274
275 return nullptr;
276 }
277
289 ccf::kv::Version v, const std::shared_ptr<AbstractMap>& map_) override
290 {
291 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
292 if (map == nullptr)
293 {
294 throw std::logic_error(fmt::format(
295 "Can't add dynamic map - {} is not of expected type",
296 map_->get_name()));
297 }
298
299 const auto map_name = map->get_name();
300 if (get_map_unsafe(v, map_name) != nullptr)
301 {
302 throw std::logic_error(fmt::format(
303 "Can't add dynamic map - already have a map named {}", map_name));
304 }
305
306 LOG_DEBUG_FMT("Adding newly created map '{}' at version {}", map_name, v);
307 maps[map_name] = std::make_pair(v, map);
308
309 {
310 // If we have any hooks for the given map name, set them on this new map
311 const auto global_it = global_hooks.find(map_name);
312 if (global_it != global_hooks.end())
313 {
314 map->set_global_hook(global_it->second);
315 }
316
317 const auto map_it = map_hooks.find(map_name);
318 if (map_it != map_hooks.end())
319 {
320 map->set_map_hook(map_it->second);
321 }
322 }
323 }
324
325 std::unique_ptr<AbstractSnapshot> snapshot_unsafe_maps(Version v) override
326 {
327 auto cv = compacted_version();
328 if (v < cv)
329 {
330 throw std::logic_error(fmt::format(
331 "Cannot snapshot at version {} which is earlier than last "
332 "compacted version {} ",
333 v,
334 cv));
335 }
336
337 if (v > current_version())
338 {
339 throw std::logic_error(fmt::format(
340 "Cannot snapshot at version {} which is later than current "
341 "version {} ",
342 v,
343 current_version()));
344 }
345
346 auto snapshot = std::make_unique<StoreSnapshot>(v);
347
348 {
349 for (auto& it : maps)
350 {
351 auto& [_, map] = it.second;
352 snapshot->add_map_snapshot(map->snapshot(v));
353 }
354
355 auto h = get_history();
356 if (h)
357 {
358 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
359 }
360
361 auto c = get_consensus();
362 if (c)
363 {
364 snapshot->add_view_history(c->get_view_history(v));
365 }
366 }
367
368 return snapshot;
369 }
370
371 void lock_maps() override
372 {
373 maps_lock.lock();
374 for (auto& it : maps)
375 {
376 auto& [_, map] = it.second;
377 map->lock();
378 }
379 }
380
381 void unlock_maps() override
382 {
383 for (auto& it : maps)
384 {
385 auto& [_, map] = it.second;
386 map->unlock();
387 }
388 maps_lock.unlock();
389 }
390
391 std::vector<uint8_t> serialise_snapshot(
392 std::unique_ptr<AbstractSnapshot> snapshot) override
393 {
394 auto e = get_encryptor();
395 return snapshot->serialise(e);
396 }
397
399 const uint8_t* data,
400 size_t size,
402 std::vector<Version>* view_history = nullptr,
403 bool public_only = false) override
404 {
405 auto e = get_encryptor();
406 auto d = KvStoreDeserialiser(
407 e,
408 public_only ? ccf::kv::SecurityDomain::PUBLIC :
409 std::optional<ccf::kv::SecurityDomain>());
410
411 ccf::kv::Term term;
412 ccf::kv::EntryFlags entry_flags;
413 auto v_ = d.init(data, size, term, entry_flags, is_historical);
414 if (!v_.has_value())
415 {
416 LOG_FAIL_FMT("Initialisation of deserialise object failed");
417 return ApplyResult::FAIL;
418 }
419 auto v = v_.value();
420 std::shared_ptr<TxHistory> h = nullptr;
421 std::vector<uint8_t> hash_at_snapshot;
422 std::vector<Version> view_history_;
423 {
424 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
425
426 for (auto& it : maps)
427 {
428 auto& [_, map] = it.second;
429 map->lock();
430 }
431
432 h = get_history();
433 if (h)
434 {
435 hash_at_snapshot = d.deserialise_raw();
436 }
437
438 if (view_history)
439 {
440 view_history_ = d.deserialise_view_history();
441 }
442
443 OrderedChanges changes;
444 MapCollection new_maps;
445
446 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
447 {
448 const auto map_name = r.value();
449
450 std::shared_ptr<ccf::kv::untyped::Map> map = nullptr;
451
452 auto search = maps.find(map_name);
453 if (search == maps.end())
454 {
455 map = std::make_shared<ccf::kv::untyped::Map>(
456 this, map_name, get_security_domain(map_name));
457 new_maps[map_name] = map;
459 "Creating map {} while deserialising snapshot at version {}",
460 map_name,
461 v);
462 }
463 else
464 {
465 map = search->second.second;
466 }
467
468 auto changes_search = changes.find(map_name);
469 if (changes_search != changes.end())
470 {
471 LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
472 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
473 return ApplyResult::FAIL;
474 }
475
476 auto deserialised_snapshot_changes =
477 map->deserialise_snapshot_changes(d);
478
479 // Take ownership of the produced change set, store it to be committed
480 // later
481 changes.emplace_hint(
482 changes_search,
483 std::piecewise_construct,
484 std::forward_as_tuple(map_name),
485 std::forward_as_tuple(
486 map, std::move(deserialised_snapshot_changes)));
487 }
488
489 for (auto& it : maps)
490 {
491 auto& [_, map] = it.second;
492 map->unlock();
493 }
494
495 if (!d.end())
496 {
497 LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
498 return ApplyResult::FAIL;
499 }
500
501 // Each map is committed at a different version, independently of the
502 // overall snapshot version. The commit versions for each map are
503 // contained in the snapshot and applied when the snapshot is committed.
504 bool track_deletes_on_missing_keys = false;
505 auto r = apply_changes(
506 changes,
507 [](bool) { return std::make_tuple(NoVersion, NoVersion); },
508 hooks,
509 new_maps,
510 std::nullopt,
511 false,
512 track_deletes_on_missing_keys);
513 if (!r.has_value())
514 {
516 "Failed to commit deserialised snapshot at version {}", v);
517 return ApplyResult::FAIL;
518 }
519
520 {
521 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
522 version = v;
523 last_replicated = v;
524 }
525 }
526
527 if (h)
528 {
529 if (!h->init_from_snapshot(hash_at_snapshot))
530 {
531 return ApplyResult::FAIL;
532 }
533 }
534
535 if (view_history)
536 {
537 *view_history = std::move(view_history_);
538 }
539
540 return ApplyResult::PASS;
541 }
542
543 void compact(Version v) override
544 {
545 // This is called when the store will never be rolled back to any
546 // state before the specified version.
547 // No transactions can be prepared or committed during compaction.
548
549 if (snapshotter)
550 {
551 auto c = get_consensus();
552 bool generate_snapshot = c && c->is_primary();
553 snapshotter->commit(v, generate_snapshot);
554 }
555
556 if (chunker)
557 {
558 chunker->compacted_to(v);
559 }
560
561 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
562
563 if (v > current_version())
564 {
565 return;
566 }
567
568 for (auto& it : maps)
569 {
570 auto& [_, map] = it.second;
571 map->lock();
572 }
573
574 for (auto& it : maps)
575 {
576 auto& [_, map] = it.second;
577 map->compact(v);
578 }
579
580 for (auto& it : maps)
581 {
582 auto& [_, map] = it.second;
583 map->unlock();
584 }
585
586 {
587 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
588 compacted = v;
589
590 auto h = get_history();
591 if (h)
592 {
593 h->compact(v);
594 }
595 }
596
597 for (auto& it : maps)
598 {
599 auto& [_, map] = it.second;
600 map->post_compact();
601 }
602 }
603
604 void rollback(const TxID& tx_id, Term term_of_next_version_) override
605 {
606 // This is called to roll the store back to the state it was in
607 // at the specified version.
608 // No transactions can be prepared or committed during rollback.
609
610 if (snapshotter)
611 {
612 snapshotter->rollback(tx_id.version);
613 }
614
615 if (chunker)
616 {
617 chunker->rolled_back_to(tx_id.version);
618 }
619
620 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
621
622 {
623 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
624 if (tx_id.version < compacted)
625 {
626 throw std::logic_error(fmt::format(
627 "Attempting rollback to {}, earlier than commit version {}",
628 tx_id.version,
629 compacted));
630 }
631
632 // The term should always be updated on rollback() when passed
633 // regardless of whether version needs to be updated or not
634 term_of_next_version = term_of_next_version_;
636
637 // History must be informed of the term_of_last_version change, even if
638 // no actual rollback is required
639 auto h = get_history();
640 if (h)
641 {
642 h->rollback(tx_id, term_of_next_version);
643 }
644
645 if (tx_id.version >= version)
646 {
647 return;
648 }
649
650 version = tx_id.version;
651 last_replicated = tx_id.version;
654 pending_txs.clear();
655 auto e = get_encryptor();
656 if (e)
657 {
658 e->rollback(tx_id.version);
659 }
660 }
661
662 for (auto& it : maps)
663 {
664 auto& [_, map] = it.second;
665 map->lock();
666 }
667
668 auto it = maps.begin();
669 while (it != maps.end())
670 {
671 auto& [map_creation_version, map] = it->second;
672 // Rollback this map whether we're forgetting about it or not. Anyone
673 // else still holding it should see it has rolled back
674 map->rollback(tx_id.version);
675 if (map_creation_version > tx_id.version)
676 {
677 // Map was created more recently; its creation is being forgotten.
678 // Erase our knowledge of it
679 map->unlock();
680 it = maps.erase(it);
681 }
682 else
683 {
684 ++it;
685 }
686 }
687
688 for (auto& map_it : maps)
689 {
690 auto& [_, map] = map_it.second;
691 map->unlock();
692 }
693 }
694
695 void initialise_term(Term t) override
696 {
697 // Note: This should only be called once, when the store is first
698 // initialised. term_of_next_version is later updated via rollback.
699 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
700 if (term_of_next_version != 0)
701 {
702 throw std::logic_error("term_of_next_version is already initialised");
703 }
704
706 auto h = get_history();
707 if (h)
708 {
709 h->set_term(term_of_next_version);
710 }
711 }
712
714 const std::vector<uint8_t>& data,
715 bool public_only,
718 ccf::kv::EntryFlags& entry_flags,
719 OrderedChanges& changes,
720 MapCollection& new_maps,
721 ccf::ClaimsDigest& claims_digest,
722 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
723 bool ignore_strict_versions = false) override
724 {
725 // This will return FAILED if the serialised transaction is being
726 // applied out of order.
727 // Processing transactions locally and also deserialising to the
728 // same store will result in a store version mismatch and
729 // deserialisation will then fail.
730 auto e = get_encryptor();
731
732 auto d = KvStoreDeserialiser(
733 e,
734 public_only ? ccf::kv::SecurityDomain::PUBLIC :
735 std::optional<ccf::kv::SecurityDomain>());
736
737 auto v_ =
738 d.init(data.data(), data.size(), view, entry_flags, is_historical);
739 if (!v_.has_value())
740 {
741 LOG_FAIL_FMT("Initialisation of deserialise object failed");
742 return false;
743 }
744 v = v_.value();
745
746 claims_digest = std::move(d.consume_claims_digest());
748 "Deserialised claim digest {} {}",
749 claims_digest.value(),
750 claims_digest.empty());
751
752 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
753 if (commit_evidence_digest.has_value())
755 "Deserialised commit evidence digest {}",
756 commit_evidence_digest.value());
757
758 // Throw away any local commits that have not propagated via the
759 // consensus.
761
762 if (strict_versions && !ignore_strict_versions)
763 {
764 // Make sure this is the next transaction.
765 auto cv = current_version();
766 if (cv != (v - 1))
767 {
769 "Tried to deserialise {} but current_version is {}", v, cv);
770 return false;
771 }
772 }
773
774 // Deserialised transactions express read dependencies as versions,
775 // rather than with the actual value read. As a result, they don't
776 // need snapshot isolation on the map state, and so do not need to
777 // lock each of the maps before creating the transaction.
778 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
779
780 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
781 {
782 const auto map_name = r.value();
783
784 auto map = get_map_internal(v, map_name);
785 if (map == nullptr)
786 {
787 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
788 this, map_name, get_security_domain(map_name));
789 map = new_map;
790 new_maps[map_name] = new_map;
792 "Creating map '{}' while deserialising transaction at version {}",
793 map_name,
794 v);
795 }
796
797 auto change_search = changes.find(map_name);
798 if (change_search != changes.end())
799 {
800 LOG_FAIL_FMT("Failed to deserialise transaction at version {}", v);
801 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
802 return false;
803 }
804
805 auto deserialised_changes = map->deserialise_changes(d, v);
806
807 // Take ownership of the produced change set, store it to be applied
808 // later
809 changes.emplace_hint(
810 change_search,
811 std::piecewise_construct,
812 std::forward_as_tuple(map_name),
813 std::forward_as_tuple(map, std::move(deserialised_changes)));
814 }
815
816 if (!d.end())
817 {
818 LOG_FAIL_FMT("Unexpected content in transaction at version {}", v);
819 return false;
820 }
821
822 return true;
823 }
824
825 std::unique_ptr<ccf::kv::AbstractExecutionWrapper> deserialize(
826 const std::vector<uint8_t>& data,
827 bool public_only = false,
828 const std::optional<TxID>& expected_txid = std::nullopt) override
829 {
830 auto exec = std::make_unique<CFTExecutionWrapper>(
831 this,
832 get_history(),
833 get_chunker(),
834 std::move(data),
835 public_only,
836 expected_txid);
837 return exec;
838 }
839
840 bool operator==(const Store& that) const
841 {
842 // Only used for debugging, not thread safe.
843 if (version != that.version)
844 return false;
845
846 if (maps.size() != that.maps.size())
847 return false;
848
849 for (auto it = maps.begin(); it != maps.end(); ++it)
850 {
851 auto search = that.maps.find(it->first);
852
853 if (search == that.maps.end())
854 return false;
855
856 auto& [this_v, this_map] = it->second;
857 auto& [that_v, that_map] = search->second;
858
859 if (this_v != that_v)
860 return false;
861
862 if (*this_map != *that_map)
863 return false;
864 }
865
866 return true;
867 }
868
870 {
871 return version;
872 }
873
875 {
876 // Must lock in case the version or read term is being incremented.
877 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
878 return current_txid_unsafe();
879 }
880
882 {
883 const auto kv_id = current_txid();
884 return {kv_id.term, kv_id.version};
885 }
886
887 std::pair<TxID, Term> current_txid_and_commit_term() override
888 {
889 // Must lock in case the version or commit term is being incremented.
890 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
891 return {current_txid_unsafe(), term_of_next_version};
892 }
893
895 {
896 return compacted;
897 }
898
899 Term commit_view() override
900 {
901 // Must lock in case the commit_view is being incremented.
903 }
904
906 const TxID& txid,
907 std::unique_ptr<PendingTx> pending_tx,
908 bool globally_committable) override
909 {
910 auto c = get_consensus();
911 if (!c)
912 {
914 }
915
916 std::lock_guard<ccf::pal::Mutex> cguard(commit_lock);
917
919 "Store::commit {}{}",
920 txid.version,
921 (globally_committable ? " globally_committable" : ""));
922
923 BatchVector batch;
924 Version previous_last_replicated = 0;
925 Version next_last_replicated = 0;
926 Version previous_rollback_count = 0;
927 ccf::View replication_view = 0;
928
929 std::vector<std::tuple<std::unique_ptr<PendingTx>, bool>>
930 contiguous_pending_txs;
931 auto h = get_history();
932
933 {
934 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
935 if (txid.term != term_of_next_version && get_consensus()->is_primary())
936 {
937 // This can happen when a transaction started before a view change,
938 // but tries to commit after the view change is complete.
940 "Want to commit for term {} but term is {}",
941 txid.term,
943
945 }
946
947 if (globally_committable && txid.version > last_committable)
948 {
950 }
951
952 pending_txs.insert(
953 {txid.version,
954 std::make_tuple(std::move(pending_tx), globally_committable)});
955
956 LOG_TRACE_FMT("Inserting pending tx at {}", txid.version);
957
958 for (Version offset = 1; true; ++offset)
959 {
960 auto search = pending_txs.find(last_replicated + offset);
961 if (search == pending_txs.end())
962 {
964 "Couldn't find {} = {} + {}, giving up on batch while committing "
965 "{}.{}",
966 last_replicated + offset,
968 offset,
969 txid.term,
970 txid.version);
971 break;
972 }
973
974 contiguous_pending_txs.emplace_back(std::move(search->second));
975 pending_txs.erase(search);
976 }
977
978 previous_rollback_count = rollback_count;
979 previous_last_replicated = last_replicated;
980 next_last_replicated = last_replicated + contiguous_pending_txs.size();
981
982 replication_view = term_of_next_version;
983 }
984 // Release version lock
985
986 if (contiguous_pending_txs.size() == 0)
987 {
989 }
990
991 size_t offset = 1;
992 for (auto& [pending_tx_, committable_] : contiguous_pending_txs)
993 {
994 auto
995 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
996 pending_tx_->call();
997 auto data_shared =
998 std::make_shared<std::vector<uint8_t>>(std::move(data_));
999 auto hooks_shared =
1000 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
1001
1002 // NB: this cannot happen currently. Regular Tx only make it here if
1003 // they did succeed, and signatures cannot conflict because they
1004 // execute in order with a read_version that's version - 1, so even
1005 // two contiguous signatures are fine
1006 if (success_ != CommitResult::SUCCESS)
1007 {
1009 "Failed Tx commit {}", previous_last_replicated + offset);
1010 }
1011
1012 if (h)
1013 {
1014 h->append_entry(
1015 ccf::entry_leaf(
1016 *data_shared, commit_evidence_digest_, claims_digest_),
1017 replication_view);
1018 }
1019
1020 if (chunker)
1021 {
1022 chunker->append_entry_size(data_shared->size());
1023 }
1024
1026 "Batching {} ({}) during commit of {}.{}",
1027 previous_last_replicated + offset,
1028 data_shared->size(),
1029 txid.term,
1030 txid.version);
1031
1032 batch.emplace_back(
1033 previous_last_replicated + offset,
1034 data_shared,
1035 committable_,
1036 hooks_shared);
1037
1038 offset++;
1039 }
1040
1041 if (c->replicate(batch, replication_view))
1042 {
1043 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1044 if (
1045 last_replicated == previous_last_replicated &&
1046 previous_rollback_count == rollback_count)
1047 {
1048 last_replicated = next_last_replicated;
1049 }
1050 return CommitResult::SUCCESS;
1051 }
1052 else
1053 {
1054 LOG_DEBUG_FMT("Failed to replicate");
1056 }
1057 }
1058
1060 {
1061 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1063 }
1064
1066 {
1067 // Note that snapshotter->record_committable, and therefore this function,
1068 // assumes that `version` is a committable entry/signature.
1069
1071
1072 if (chunker)
1073 {
1074 r |= chunker->is_chunk_end_requested(version);
1075 }
1076
1077 if (snapshotter)
1078 {
1079 r |= snapshotter->record_committable(version);
1080 }
1081
1082 return r;
1083 }
1084
1085 void lock_map_set() override
1086 {
1087 maps_lock.lock();
1088 }
1089
1090 void unlock_map_set() override
1091 {
1092 maps_lock.unlock();
1093 }
1094
1095 bool check_rollback_count(Version count) override
1096 {
1097 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1098 return rollback_count == count;
1099 }
1100
1101 std::tuple<Version, Version> next_version(bool commit_new_map) override
1102 {
1103 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1104 Version v = next_version_unsafe();
1105
1106 auto previous_last_new_map = last_new_map;
1107 if (commit_new_map)
1108 {
1109 last_new_map = v;
1110 }
1111
1112 return std::make_tuple(v, previous_last_new_map);
1113 }
1114
1116 {
1117 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1118 return next_version_unsafe();
1119 }
1120
1121 TxID next_txid() override
1122 {
1123 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1124 next_version_unsafe();
1125
1126 return {term_of_next_version, version};
1127 }
1128
1129 size_t committable_gap() override
1130 {
1131 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1132 return version - last_committable;
1133 }
1134
1148 {
1149 {
1150 const auto source_version = store.current_version();
1151 const auto target_version = current_version();
1152 if (source_version > target_version)
1153 {
1154 throw std::runtime_error(fmt::format(
1155 "Invalid call to swap_private_maps. Source is at version {} while "
1156 "target is at {}",
1157 source_version,
1158 target_version));
1159 }
1160 }
1161
1162 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1163 maps_lock, store.maps_lock);
1164
1165 // Each entry is (Name, MyMap, TheirMap)
1166 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1167 std::vector<MapEntry> entries;
1168
1169 // Get the list of private maps from the source store
1170 for (auto& [name, pair] : store.maps)
1171 {
1172 auto& [_, map] = pair;
1173 if (map->get_security_domain() == SecurityDomain::PRIVATE)
1174 {
1175 map->lock();
1176 entries.emplace_back(name, nullptr, map.get());
1177 }
1178 }
1179
1180 // For each source map, either create it or, where it already exists,
1181 // confirm it is PRIVATE. Lock it and store it in entries
1182 auto entry = entries.begin();
1183 while (entry != entries.end())
1184 {
1185 const auto& [name, _, their_map] = *entry;
1186 std::shared_ptr<AbstractMap> map = nullptr;
1187 const auto it = maps.find(name);
1188 if (it == maps.end())
1189 {
1190 // NB: We lose the creation version from the original map, but assume
1191 // it is irrelevant - its creation should no longer be at risk of
1192 // rollback
1193 auto new_map = std::make_pair(
1194 NoVersion,
1195 std::make_shared<ccf::kv::untyped::Map>(
1196 this, name, SecurityDomain::PRIVATE));
1197 maps[name] = new_map;
1198 map = new_map.second;
1199 }
1200 else
1201 {
1202 map = it->second.second;
1203 if (map->get_security_domain() != SecurityDomain::PRIVATE)
1204 {
1205 throw std::logic_error(fmt::format(
1206 "Swap mismatch - map {} is private in source but not in target",
1207 name));
1208 }
1209 }
1210
1211 std::get<1>(*entry) = map.get();
1212 map->lock();
1213 ++entry;
1214 }
1215
1216 for (auto& [name, lhs, rhs] : entries)
1217 {
1218 lhs->swap(rhs);
1219 }
1220
1221 for (auto& [name, lhs, rhs] : entries)
1222 {
1223 lhs->unlock();
1224 rhs->unlock();
1225 }
1226 }
1227
1229 const std::string& map_name, const ccf::kv::untyped::Map::MapHook& hook)
1230 {
1231 map_hooks[map_name] = hook;
1232
1233 const auto it = maps.find(map_name);
1234 if (it != maps.end())
1235 {
1236 it->second.second->set_map_hook(hook);
1237 }
1238 }
1239
1240 void unset_map_hook(const std::string& map_name)
1241 {
1242 map_hooks.erase(map_name);
1243
1244 const auto it = maps.find(map_name);
1245 if (it != maps.end())
1246 {
1247 it->second.second->unset_map_hook();
1248 }
1249 }
1250
1252 const std::string& map_name,
1254 {
1255 global_hooks[map_name] = hook;
1256
1257 const auto it = maps.find(map_name);
1258 if (it != maps.end())
1259 {
1260 it->second.second->set_global_hook(hook);
1261 }
1262 }
1263
1264 void unset_global_hook(const std::string& map_name)
1265 {
1266 global_hooks.erase(map_name);
1267
1268 const auto it = maps.find(map_name);
1269 if (it != maps.end())
1270 {
1271 it->second.second->unset_global_hook();
1272 }
1273 }
1274
1276 {
1277 return ReadOnlyTx(this);
1278 }
1279
1280 std::unique_ptr<ReadOnlyTx> create_read_only_tx_ptr() override
1281 {
1282 return std::make_unique<ReadOnlyTx>(this);
1283 }
1284
1286 {
1287 return TxDiff(this);
1288 }
1289
1291 {
1292 return CommittableTx(this);
1293 }
1294
1295 std::unique_ptr<CommittableTx> create_tx_ptr()
1296 {
1297 return std::make_unique<CommittableTx>(this);
1298 }
1299
1301 {
1302 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1303 return ReservedTx(this, term_of_last_version, tx_id, rollback_count);
1304 }
1305
1306 virtual void set_flag(StoreFlag f) override
1307 {
1308 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1309 set_flag_unsafe(f);
1310 }
1311
1312 virtual void unset_flag(StoreFlag f) override
1313 {
1314 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1316 }
1317
1318 virtual bool flag_enabled(StoreFlag f) override
1319 {
1320 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1321 return flag_enabled_unsafe(f);
1322 }
1323
1324 virtual void set_flag_unsafe(StoreFlag f) override
1325 {
1326 this->flags |= static_cast<uint8_t>(f);
1327 }
1328
1329 virtual void unset_flag_unsafe(StoreFlag f) override
1330 {
1331 this->flags &= ~static_cast<uint8_t>(f);
1332 }
1333
1334 virtual bool flag_enabled_unsafe(StoreFlag f) const override
1335 {
1336 return (flags & static_cast<uint8_t>(f)) != 0;
1337 }
1338 };
1339
1340 using StorePtr = std::shared_ptr<ccf::kv::Store>;
1341}
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:680
StoreFlag
Definition kv_types.h:749
Definition committable_tx.h:18
Definition deserialise.h:18
Definition read_only_store.h:13
Definition tx.h:160
Definition committable_tx.h:394
Definition store.h:25
Term term_of_last_version
Definition store.h:51
std::atomic< Version > compacted
Definition store.h:39
std::atomic< Version > version
Definition store.h:37
ccf::pal::Mutex maps_lock
Definition store.h:33
Version last_committable
Definition store.h:56
Version rollback_count
Definition store.h:58
Version last_new_map
Definition store.h:38
Maps maps
Definition store.h:34
std::atomic< Term > term_of_next_version
Definition store.h:45
void clear()
Definition store.h:64
ccf::pal::Mutex version_lock
Definition store.h:36
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:32
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:61
Version last_replicated
Definition store.h:53
ccf::pal::Mutex commit_lock
Definition store.h:42
Definition store.h:88
virtual void unset_flag(StoreFlag f) override
Definition store.h:1312
virtual bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1334
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1065
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:398
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1300
ReadOnlyTx create_read_only_tx() override
Definition store.h:1275
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:256
size_t committable_gap() override
Definition store.h:1129
TxID next_txid() override
Definition store.h:1121
void unset_global_hook(const std::string &map_name)
Definition store.h:1264
void initialise_term(Term t) override
Definition store.h:695
Store(const Store &that)=delete
std::shared_ptr< TxHistory > get_history() override
Definition store.h:201
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1228
void unlock_map_set() override
Definition store.h:1090
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:231
void unset_map_hook(const std::string &map_name)
Definition store.h:1240
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:288
Version current_version() override
Definition store.h:869
void swap_private_maps(Store &store)
Definition store.h:1147
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:249
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1101
bool operator==(const Store &that) const
Definition store.h:840
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:211
EncryptorPtr get_encryptor() override
Definition store.h:226
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:391
bool check_rollback_count(Version count) override
Definition store.h:1095
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1295
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:196
ccf::kv::TxID current_txid() override
Definition store.h:874
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:604
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:325
void lock_map_set() override
Definition store.h:1085
Term commit_view() override
Definition store.h:899
void unlock_maps() override
Definition store.h:381
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1280
void lock_maps() override
Definition store.h:371
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:221
CommittableTx create_tx()
Definition store.h:1290
virtual bool flag_enabled(StoreFlag f) override
Definition store.h:1318
bool should_create_ledger_chunk(Version version) override
Definition store.h:1059
Version next_version() override
Definition store.h:1115
Version compacted_version() override
Definition store.h:894
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:181
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1251
virtual void set_flag_unsafe(StoreFlag f) override
Definition store.h:1324
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:887
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:188
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:825
virtual void set_flag(StoreFlag f) override
Definition store.h:1306
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:216
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:206
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:905
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:713
ccf::TxID get_txid() override
Definition store.h:881
void compact(Version v) override
Definition store.h:543
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:262
virtual void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1329
TxDiff create_tx_diff() override
Definition store.h:1285
Definition tx.h:131
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition app_interface.h:19
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:576
uint64_t Term
Definition kv_types.h:48
@ PRIVATE
Definition kv_types.h:257
@ PUBLIC
Definition kv_types.h:256
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:245
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:248
@ FAIL_NO_REPLICATE
Definition kv_types.h:251
@ SUCCESS
Definition kv_types.h:249
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:587
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
ApplyResult
Definition kv_types.h:341
@ FAIL
Definition kv_types.h:350
@ PASS
Definition kv_types.h:342
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1340
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Definition tx_id.h:44
Definition kv_types.h:52
Version version
Definition kv_types.h:54
Term term
Definition kv_types.h:53