CCF
Loading...
Searching...
No Matches
store.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
8#include "ccf/pal/locking.h"
9#include "deserialise.h"
10#include "ds/internal_logger.h"
11#include "kv/committable_tx.h"
13#include "kv/snapshot.h"
14#include "kv/untyped_map.h"
15#include "kv_serialiser.h"
16#include "kv_types.h"
17
18#define FMT_HEADER_ONLY
19#include <atomic>
20#include <fmt/format.h>
21#include <memory>
22
23namespace ccf::kv
24{
26 {
27 protected:
28 // All collections of Map must be ordered so that we lock their contained
29 // maps in a stable order. The order here is by map name. The version
30 // indicates the version at which the Map was created.
31 using Maps = std::map<
32 std::string,
33 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
36
38 std::atomic<Version> version = 0;
39 Version last_new_map = ccf::kv::NoVersion;
40 std::atomic<Version> compacted = 0;
41
42 // Calls to Store::commit are made atomic by taking this lock.
44
45 // Term at which write future transactions should be committed.
46 std::atomic<Term> term_of_next_version = 0;
47
48 // Term at which the last entry was committed. Further transactions
49 // should read in that term. Note that it is assumed that the history of
50 // terms of past transactions is kept track of by and specified by the
51 // caller on rollback
53
55 // Version of the latest committable entry committed in this term and by
56 // _this_ store.
58
60
61 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>, bool>>
63
64 public:
65 void clear()
66 {
67 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
69
70 maps.clear();
71 pending_txs.clear();
72
73 version = 0;
74 last_new_map = ccf::kv::NoVersion;
75 compacted = 0;
78
82 }
83 };
84
85 class Store : public AbstractStore,
86 public StoreState,
88 public ReadOnlyStore
89 {
90 private:
91 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
92 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
93 Hooks global_hooks;
94 MapHooks map_hooks;
95
96 std::shared_ptr<Consensus> consensus = nullptr;
97 std::shared_ptr<TxHistory> history = nullptr;
98 std::shared_ptr<ILedgerChunker> chunker = nullptr;
99 EncryptorPtr encryptor = nullptr;
100 SnapshotterPtr snapshotter = nullptr;
101
102 // Generally we will only accept deserialised views if they are contiguous -
103 // at Version N we reject everything but N+1. The exception is when a Store
104 // is used for historical queries, where it may deserialise arbitrary
105 // transactions. In this case the Store is a useful container for a set of
106 // Tables, but its versioning invariants are ignored.
107 const bool strict_versions = true;
108
109 // If true, use historical ledger secrets to deserialise entries
110 const bool is_historical = false;
111
112 // Ledger entry header flags
113 uint8_t flags = 0;
114
115 bool commit_deserialised(
116 OrderedChanges& changes,
117 Version v,
118 Term term,
119 const MapCollection& new_maps,
121 bool track_deletes_on_missing_keys) override
122 {
123 auto c = apply_changes(
124 changes,
125 [v](bool) { return std::make_tuple(v, v - 1); },
126 hooks,
127 new_maps,
128 std::nullopt,
129 track_deletes_on_missing_keys);
130 if (!c.has_value())
131 {
132 LOG_FAIL_FMT("Failed to commit deserialised Tx at version {}", v);
133 return false;
134 }
135 {
136 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
137 version = v;
140 }
141 return true;
142 }
143
144 bool has_map_internal(const std::string& name)
145 {
146 return maps.contains(name);
147 }
148
149 Version next_version_unsafe()
150 {
151 // Get the next global version
152 ++version;
153
154 // Version was previously signed, with negative values representing
155 // deletions. Maintain this restriction for compatibility with old code.
156 if (version > std::numeric_limits<int64_t>::max())
157 {
158 LOG_FAIL_FMT("KV version too large - wrapping to 0");
159 version = 0;
160 }
161
162 // Further transactions should read in the commit term
164
165 return version;
166 }
167
168 TxID current_txid_unsafe()
169 {
170 // version_lock should be first acquired
172 }
173
174 public:
175 Store(bool strict_versions_ = true, bool is_historical_ = false) :
176 strict_versions(strict_versions_),
177 is_historical(is_historical_)
178 {}
179
180 Store(const Store& that) = delete;
181
182 std::shared_ptr<Consensus> get_consensus() override
183 {
184 // We need to use std::atomic_load<std::shared_ptr<T>>
185 // after clang supports it.
186 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
187 return std::atomic_load(&consensus);
188 }
189
190 void set_consensus(const std::shared_ptr<Consensus>& consensus_)
191 {
192 std::atomic_store(&consensus, consensus_);
193 }
194
195 std::shared_ptr<TxHistory> get_history() override
196 {
197 return history;
198 }
199
200 void set_history(const std::shared_ptr<TxHistory>& history_)
201 {
202 history = history_;
203 }
204
205 std::shared_ptr<ILedgerChunker> get_chunker() override
206 {
207 return chunker;
208 }
209
210 void set_chunker(const std::shared_ptr<ILedgerChunker>& chunker_)
211 {
212 chunker = chunker_;
213 }
214
215 void set_encryptor(const EncryptorPtr& encryptor_)
216 {
217 encryptor = encryptor_;
218 }
219
221 {
222 return encryptor;
223 }
224
225 void set_snapshotter(const SnapshotterPtr& snapshotter_)
226 {
227 snapshotter = snapshotter_;
228 }
229
243 std::shared_ptr<AbstractMap> get_map(
244 ccf::kv::Version v, const std::string& map_name) override
245 {
246 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
247 return get_map_internal(v, map_name);
248 }
249
250 std::shared_ptr<AbstractMap> get_map_unsafe(
251 ccf::kv::Version v, const std::string& map_name) override
252 {
253 return get_map_internal(v, map_name);
254 }
255
256 std::shared_ptr<ccf::kv::untyped::Map> get_map_internal(
257 ccf::kv::Version v, const std::string& map_name)
258 {
259 auto search = maps.find(map_name);
260 if (search != maps.end())
261 {
262 const auto& [map_creation_version, map_ptr] = search->second;
263 if (v >= map_creation_version || map_creation_version == NoVersion)
264 {
265 return map_ptr;
266 }
267 }
268
269 return nullptr;
270 }
271
283 ccf::kv::Version v, const std::shared_ptr<AbstractMap>& map_) override
284 {
285 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
286 if (map == nullptr)
287 {
288 throw std::logic_error(fmt::format(
289 "Can't add dynamic map - {} is not of expected type",
290 map_->get_name()));
291 }
292
293 const auto map_name = map->get_name();
294 if (get_map_unsafe(v, map_name) != nullptr)
295 {
296 throw std::logic_error(fmt::format(
297 "Can't add dynamic map - already have a map named {}", map_name));
298 }
299
300 LOG_DEBUG_FMT("Adding newly created map '{}' at version {}", map_name, v);
301 maps[map_name] = std::make_pair(v, map);
302
303 {
304 // If we have any hooks for the given map name, set them on this new map
305 const auto global_it = global_hooks.find(map_name);
306 if (global_it != global_hooks.end())
307 {
308 map->set_global_hook(global_it->second);
309 }
310
311 const auto map_it = map_hooks.find(map_name);
312 if (map_it != map_hooks.end())
313 {
314 map->set_map_hook(map_it->second);
315 }
316 }
317 }
318
319 std::unique_ptr<AbstractSnapshot> snapshot_unsafe_maps(Version v) override
320 {
321 auto cv = compacted_version();
322 if (v < cv)
323 {
324 throw std::logic_error(fmt::format(
325 "Cannot snapshot at version {} which is earlier than last "
326 "compacted version {} ",
327 v,
328 cv));
329 }
330
331 if (v > current_version())
332 {
333 throw std::logic_error(fmt::format(
334 "Cannot snapshot at version {} which is later than current "
335 "version {} ",
336 v,
337 current_version()));
338 }
339
340 auto snapshot = std::make_unique<StoreSnapshot>(v);
341
342 {
343 for (auto& it : maps)
344 {
345 auto& [_, map] = it.second;
346 snapshot->add_map_snapshot(map->snapshot(v));
347 }
348
349 auto h = get_history();
350 if (h)
351 {
352 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
353 }
354
355 auto c = get_consensus();
356 if (c)
357 {
358 snapshot->add_view_history(c->get_view_history(v));
359 }
360 }
361
362 return snapshot;
363 }
364
365 void lock_maps() override
366 {
367 maps_lock.lock();
368 for (auto& it : maps)
369 {
370 auto& [_, map] = it.second;
371 map->lock();
372 }
373 }
374
375 void unlock_maps() override
376 {
377 for (auto& it : maps)
378 {
379 auto& [_, map] = it.second;
380 map->unlock();
381 }
382 maps_lock.unlock();
383 }
384
385 std::vector<uint8_t> serialise_snapshot(
386 std::unique_ptr<AbstractSnapshot> snapshot) override
387 {
388 auto e = get_encryptor();
389 return snapshot->serialise(e);
390 }
391
393 const uint8_t* data,
394 size_t size,
396 std::vector<Version>* view_history = nullptr,
397 bool public_only = false) override
398 {
399 auto e = get_encryptor();
400 auto d = KvStoreDeserialiser(
401 e,
402 public_only ? ccf::kv::SecurityDomain::PUBLIC :
403 std::optional<ccf::kv::SecurityDomain>());
404
405 ccf::kv::Term term = 0;
406 ccf::kv::EntryFlags entry_flags = {};
407 auto v_ = d.init(data, size, term, entry_flags, is_historical);
408 if (!v_.has_value())
409 {
410 LOG_FAIL_FMT("Initialisation of deserialise object failed");
411 return ApplyResult::FAIL;
412 }
413 auto v = v_.value();
414 std::shared_ptr<TxHistory> h = nullptr;
415 std::vector<uint8_t> hash_at_snapshot;
416 std::vector<Version> view_history_;
417 {
418 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
419
420 for (auto& it : maps)
421 {
422 auto& [_, map] = it.second;
423 map->lock();
424 }
425
426 h = get_history();
427 if (h)
428 {
429 hash_at_snapshot = d.deserialise_raw();
430 }
431
432 if (view_history != nullptr)
433 {
434 view_history_ = d.deserialise_view_history();
435 }
436
437 OrderedChanges changes;
438 MapCollection new_maps;
439
440 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
441 {
442 const auto map_name = r.value();
443
444 std::shared_ptr<ccf::kv::untyped::Map> map = nullptr;
445
446 auto search = maps.find(map_name);
447 if (search == maps.end())
448 {
449 map = std::make_shared<ccf::kv::untyped::Map>(
450 this, map_name, get_security_domain(map_name));
451 new_maps[map_name] = map;
453 "Creating map {} while deserialising snapshot at version {}",
454 map_name,
455 v);
456 }
457 else
458 {
459 map = search->second.second;
460 }
461
462 auto changes_search = changes.find(map_name);
463 if (changes_search != changes.end())
464 {
465 LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
466 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
467 return ApplyResult::FAIL;
468 }
469
470 auto deserialised_snapshot_changes =
471 map->deserialise_snapshot_changes(d);
472
473 // Take ownership of the produced change set, store it to be committed
474 // later
475 changes.emplace_hint(
476 changes_search,
477 std::piecewise_construct,
478 std::forward_as_tuple(map_name),
479 std::forward_as_tuple(
480 map, std::move(deserialised_snapshot_changes)));
481 }
482
483 for (auto& it : maps)
484 {
485 auto& [_, map] = it.second;
486 map->unlock();
487 }
488
489 if (!d.end())
490 {
491 LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
492 return ApplyResult::FAIL;
493 }
494
495 // Each map is committed at a different version, independently of the
496 // overall snapshot version. The commit versions for each map are
497 // contained in the snapshot and applied when the snapshot is committed.
498 bool track_deletes_on_missing_keys = false;
499 auto r = apply_changes(
500 changes,
501 [](bool) { return std::make_tuple(NoVersion, NoVersion); },
502 hooks,
503 new_maps,
504 std::nullopt,
505 false,
506 track_deletes_on_missing_keys);
507 if (!r.has_value())
508 {
510 "Failed to commit deserialised snapshot at version {}", v);
511 return ApplyResult::FAIL;
512 }
513
514 {
515 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
516 version = v;
517 last_replicated = v;
518 }
519 }
520
521 if (h)
522 {
523 if (!h->init_from_snapshot(hash_at_snapshot))
524 {
525 return ApplyResult::FAIL;
526 }
527 }
528
529 if (view_history != nullptr)
530 {
531 *view_history = std::move(view_history_);
532 }
533
534 return ApplyResult::PASS;
535 }
536
537 void compact(Version v) override
538 {
539 // This is called when the store will never be rolled back to any
540 // state before the specified version.
541 // No transactions can be prepared or committed during compaction.
542
543 if (snapshotter)
544 {
545 auto c = get_consensus();
546 bool generate_snapshot = c && c->is_primary();
547 snapshotter->commit(v, generate_snapshot);
548 }
549
550 if (chunker)
551 {
552 chunker->compacted_to(v);
553 }
554
555 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
556
557 if (v > current_version())
558 {
559 return;
560 }
561
562 for (auto& it : maps)
563 {
564 auto& [_, map] = it.second;
565 map->lock();
566 }
567
568 for (auto& it : maps)
569 {
570 auto& [_, map] = it.second;
571 map->compact(v);
572 }
573
574 for (auto& it : maps)
575 {
576 auto& [_, map] = it.second;
577 map->unlock();
578 }
579
580 {
581 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
582 compacted = v;
583
584 auto h = get_history();
585 if (h)
586 {
587 h->compact(v);
588 }
589 }
590
591 for (auto& it : maps)
592 {
593 auto& [_, map] = it.second;
594 map->post_compact();
595 }
596 }
597
598 void rollback(const TxID& tx_id, Term term_of_next_version_) override
599 {
600 // This is called to roll the store back to the state it was in
601 // at the specified version.
602 // No transactions can be prepared or committed during rollback.
603
604 if (snapshotter)
605 {
606 snapshotter->rollback(tx_id.seqno);
607 }
608
609 if (chunker)
610 {
611 chunker->rolled_back_to(tx_id.seqno);
612 }
613
614 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
615
616 {
617 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
618 if (tx_id.seqno < compacted)
619 {
620 throw std::logic_error(fmt::format(
621 "Attempting rollback to {}, earlier than commit version {}",
622 tx_id.seqno,
623 compacted));
624 }
625
626 // The term should always be updated on rollback() when passed
627 // regardless of whether version needs to be updated or not
628 term_of_next_version = term_of_next_version_;
630
631 // History must be informed of the term_of_last_version change, even if
632 // no actual rollback is required
633 auto h = get_history();
634 if (h)
635 {
636 h->rollback(tx_id, term_of_next_version);
637 }
638
639 if (tx_id.seqno >= version)
640 {
641 return;
642 }
643
644 version = tx_id.seqno;
645 last_replicated = tx_id.seqno;
648 pending_txs.clear();
649 auto e = get_encryptor();
650 if (e)
651 {
652 e->rollback(tx_id.seqno);
653 }
654 }
655
656 for (auto& it : maps)
657 {
658 auto& [_, map] = it.second;
659 map->lock();
660 }
661
662 auto it = maps.begin();
663 while (it != maps.end())
664 {
665 auto& [map_creation_version, map] = it->second;
666 // Rollback this map whether we're forgetting about it or not. Anyone
667 // else still holding it should see it has rolled back
668 map->rollback(tx_id.seqno);
669 if (map_creation_version > tx_id.seqno)
670 {
671 // Map was created more recently; its creation is being forgotten.
672 // Erase our knowledge of it
673 map->unlock();
674 it = maps.erase(it);
675 }
676 else
677 {
678 ++it;
679 }
680 }
681
682 for (auto& map_it : maps)
683 {
684 auto& [_, map] = map_it.second;
685 map->unlock();
686 }
687 }
688
689 void initialise_term(Term t) override
690 {
691 // Note: This should only be called once, when the store is first
692 // initialised. term_of_next_version is later updated via rollback.
693 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
694 if (term_of_next_version != 0)
695 {
696 throw std::logic_error("term_of_next_version is already initialised");
697 }
698
700 auto h = get_history();
701 if (h)
702 {
703 h->set_term(term_of_next_version);
704 }
705 }
706
708 const std::vector<uint8_t>& data,
709 bool public_only,
712 ccf::kv::EntryFlags& entry_flags,
713 OrderedChanges& changes,
714 MapCollection& new_maps,
715 ccf::ClaimsDigest& claims_digest,
716 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
717 bool ignore_strict_versions = false) override
718 {
719 // This will return FAILED if the serialised transaction is being
720 // applied out of order.
721 // Processing transactions locally and also deserialising to the
722 // same store will result in a store version mismatch and
723 // deserialisation will then fail.
724 auto e = get_encryptor();
725
726 auto d = KvStoreDeserialiser(
727 e,
728 public_only ? ccf::kv::SecurityDomain::PUBLIC :
729 std::optional<ccf::kv::SecurityDomain>());
730
731 auto v_ =
732 d.init(data.data(), data.size(), view, entry_flags, is_historical);
733 if (!v_.has_value())
734 {
735 LOG_FAIL_FMT("Initialisation of deserialise object failed");
736 return false;
737 }
738 v = v_.value();
739
740 claims_digest = std::move(d.consume_claims_digest());
742 "Deserialised claim digest {} {}",
743 claims_digest.value(),
744 claims_digest.empty());
745
746 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
747 if (commit_evidence_digest.has_value())
748 {
750 "Deserialised commit evidence digest {}",
751 commit_evidence_digest.value());
752 }
753
754 // Throw away any local commits that have not propagated via the
755 // consensus.
757
758 if (strict_versions && !ignore_strict_versions)
759 {
760 // Make sure this is the next transaction.
761 auto cv = current_version();
762 if (cv != (v - 1))
763 {
765 "Tried to deserialise {} but current_version is {}", v, cv);
766 return false;
767 }
768 }
769
770 // Deserialised transactions express read dependencies as versions,
771 // rather than with the actual value read. As a result, they don't
772 // need snapshot isolation on the map state, and so do not need to
773 // lock each of the maps before creating the transaction.
774 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
775
776 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
777 {
778 const auto map_name = r.value();
779
780 auto map = get_map_internal(v, map_name);
781 if (map == nullptr)
782 {
783 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
784 this, map_name, get_security_domain(map_name));
785 map = new_map;
786 new_maps[map_name] = new_map;
788 "Creating map '{}' while deserialising transaction at version {}",
789 map_name,
790 v);
791 }
792
793 auto change_search = changes.find(map_name);
794 if (change_search != changes.end())
795 {
796 LOG_FAIL_FMT("Failed to deserialise transaction at version {}", v);
797 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
798 return false;
799 }
800
801 auto deserialised_changes = map->deserialise_changes(d, v);
802
803 // Take ownership of the produced change set, store it to be applied
804 // later
805 changes.emplace_hint(
806 change_search,
807 std::piecewise_construct,
808 std::forward_as_tuple(map_name),
809 std::forward_as_tuple(map, std::move(deserialised_changes)));
810 }
811
812 if (!d.end())
813 {
814 LOG_FAIL_FMT("Unexpected content in transaction at version {}", v);
815 return false;
816 }
817
818 return true;
819 }
820
821 std::unique_ptr<ccf::kv::AbstractExecutionWrapper> deserialize(
822 const std::vector<uint8_t>& data,
823 bool public_only = false,
824 const std::optional<TxID>& expected_txid = std::nullopt) override
825 {
826 auto exec = std::make_unique<CFTExecutionWrapper>(
827 this, get_history(), get_chunker(), data, public_only, expected_txid);
828 return exec;
829 }
830
831 bool operator==(const Store& that) const
832 {
833 // Only used for debugging, not thread safe.
834 if (version != that.version)
835 {
836 return false;
837 }
838
839 if (maps.size() != that.maps.size())
840 {
841 return false;
842 }
843
844 return std::ranges::all_of(maps, [&that](const auto& entry) {
845 const auto& [map_name, map_pair] = entry;
846 auto search = that.maps.find(map_name);
847
848 if (search == that.maps.end())
849 {
850 return false;
851 }
852
853 const auto& [this_v, this_map] = map_pair;
854 const auto& [that_v, that_map] = search->second;
855
856 if (this_v != that_v)
857 {
858 return false;
859 }
860
861 if (*this_map != *that_map)
862 {
863 return false;
864 }
865 return true;
866 });
867 }
868
870 {
871 return version;
872 }
873
875 {
876 // Must lock in case the version or read term is being incremented.
877 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
878 return current_txid_unsafe();
879 }
880
881 std::pair<TxID, Term> current_txid_and_commit_term() override
882 {
883 // Must lock in case the version or commit term is being incremented.
884 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
885 return {current_txid_unsafe(), term_of_next_version};
886 }
887
889 {
890 return compacted;
891 }
892
893 Term commit_view() override
894 {
895 // Must lock in case the commit_view is being incremented.
897 }
898
900 const TxID& txid,
901 std::unique_ptr<PendingTx> pending_tx,
902 bool globally_committable) override
903 {
904 auto c = get_consensus();
905 if (!c)
906 {
908 }
909
910 std::lock_guard<ccf::pal::Mutex> cguard(commit_lock);
911
913 "Store::commit {}{}",
914 txid.seqno,
915 (globally_committable ? " globally_committable" : ""));
916
917 BatchVector batch;
918 Version previous_last_replicated = 0;
919 Version next_last_replicated = 0;
920 Version previous_rollback_count = 0;
921 ccf::View replication_view = 0;
922
923 std::vector<std::tuple<std::unique_ptr<PendingTx>, bool>>
924 contiguous_pending_txs;
925 auto h = get_history();
926
927 {
928 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
929 if (txid.view != term_of_next_version && get_consensus()->is_primary())
930 {
931 // This can happen when a transaction started before a view change,
932 // but tries to commit after the view change is complete.
934 "Want to commit for term {} but term is {}",
935 txid.view,
937
939 }
940
941 if (globally_committable && txid.seqno > last_committable)
942 {
943 last_committable = txid.seqno;
944 }
945
946 pending_txs.insert(
947 {txid.seqno,
948 std::make_tuple(std::move(pending_tx), globally_committable)});
949
950 LOG_TRACE_FMT("Inserting pending tx at {}", txid.seqno);
951
952 for (Version offset = 1; true; ++offset)
953 {
954 auto search = pending_txs.find(last_replicated + offset);
955 if (search == pending_txs.end())
956 {
958 "Couldn't find {} = {} + {}, giving up on batch while committing "
959 "{}.{}",
960 last_replicated + offset,
962 offset,
963 txid.view,
964 txid.seqno);
965 break;
966 }
967
968 contiguous_pending_txs.emplace_back(std::move(search->second));
969 pending_txs.erase(search);
970 }
971
972 previous_rollback_count = rollback_count;
973 previous_last_replicated = last_replicated;
974 next_last_replicated = last_replicated + contiguous_pending_txs.size();
975
976 replication_view = term_of_next_version;
977 }
978 // Release version lock
979
980 if (contiguous_pending_txs.empty())
981 {
983 }
984
985 size_t offset = 1;
986 for (auto& [pending_tx_, committable_] : contiguous_pending_txs)
987 {
988 auto
989 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
990 pending_tx_->call();
991 auto data_shared =
992 std::make_shared<std::vector<uint8_t>>(std::move(data_));
993 auto hooks_shared =
994 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
995
996 // NB: this cannot happen currently. Regular Tx only make it here if
997 // they did succeed, and signatures cannot conflict because they
998 // execute in order with a read_version that's version - 1, so even
999 // two contiguous signatures are fine
1000 if (success_ != CommitResult::SUCCESS)
1001 {
1003 "Failed Tx commit {}", previous_last_replicated + offset);
1004 }
1005
1006 if (h)
1007 {
1008 h->append_entry(
1009 ccf::entry_leaf(
1010 *data_shared, commit_evidence_digest_, claims_digest_),
1011 replication_view);
1012 }
1013
1014 if (chunker)
1015 {
1016 chunker->append_entry_size(data_shared->size());
1017 }
1018
1020 "Batching {} ({}) during commit of {}.{}",
1021 previous_last_replicated + offset,
1022 data_shared->size(),
1023 txid.view,
1024 txid.seqno);
1025
1026 batch.emplace_back(
1027 previous_last_replicated + offset,
1028 data_shared,
1029 committable_,
1030 hooks_shared);
1031
1032 offset++;
1033 }
1034
1035 if (c->replicate(batch, replication_view))
1036 {
1037 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1038 if (
1039 last_replicated == previous_last_replicated &&
1040 previous_rollback_count == rollback_count)
1041 {
1042 last_replicated = next_last_replicated;
1043 }
1044 return CommitResult::SUCCESS;
1045 }
1046
1047 LOG_DEBUG_FMT("Failed to replicate");
1049 }
1050
1052 {
1053 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1055 }
1056
1058 {
1059 // Note that snapshotter->record_committable, and therefore this function,
1060 // assumes that `version` is a committable entry/signature.
1061
1063
1064 if (chunker)
1065 {
1066 r |= chunker->is_chunk_end_requested(version);
1067 }
1068
1069 if (snapshotter)
1070 {
1071 r |= snapshotter->record_committable(version);
1072 }
1073
1074 return r;
1075 }
1076
1077 void lock_map_set() override
1078 {
1079 maps_lock.lock();
1080 }
1081
1082 void unlock_map_set() override
1083 {
1084 maps_lock.unlock();
1085 }
1086
1087 bool check_rollback_count(Version count) override
1088 {
1089 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1090 return rollback_count == count;
1091 }
1092
1093 std::tuple<Version, Version> next_version(bool commit_new_map) override
1094 {
1095 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1096 Version v = next_version_unsafe();
1097
1098 auto previous_last_new_map = last_new_map;
1099 if (commit_new_map)
1100 {
1101 last_new_map = v;
1102 }
1103
1104 return std::make_tuple(v, previous_last_new_map);
1105 }
1106
1108 {
1109 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1110 return next_version_unsafe();
1111 }
1112
1113 TxID next_txid() override
1114 {
1115 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1116 next_version_unsafe();
1117
1118 return {term_of_next_version, version};
1119 }
1120
1121 size_t committable_gap() override
1122 {
1123 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1124 return version - last_committable;
1125 }
1126
1140 {
1141 {
1142 const auto source_version = store.current_version();
1143 const auto target_version = current_version();
1144 if (source_version > target_version)
1145 {
1146 throw std::runtime_error(fmt::format(
1147 "Invalid call to swap_private_maps. Source is at version {} while "
1148 "target is at {}",
1149 source_version,
1150 target_version));
1151 }
1152 }
1153
1154 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1155 maps_lock, store.maps_lock);
1156
1157 // Each entry is (Name, MyMap, TheirMap)
1158 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1159 std::vector<MapEntry> entries;
1160
1161 // Get the list of private maps from the source store
1162 for (auto& [name, pair] : store.maps)
1163 {
1164 auto& [_, map] = pair;
1165 if (map->get_security_domain() == SecurityDomain::PRIVATE)
1166 {
1167 map->lock();
1168 entries.emplace_back(name, nullptr, map.get());
1169 }
1170 }
1171
1172 // For each source map, either create it or, where it already exists,
1173 // confirm it is PRIVATE. Lock it and store it in entries
1174 auto entry = entries.begin();
1175 while (entry != entries.end())
1176 {
1177 const auto& [name, _, their_map] = *entry;
1178 std::shared_ptr<AbstractMap> map = nullptr;
1179 const auto it = maps.find(name);
1180 if (it == maps.end())
1181 {
1182 // NB: We lose the creation version from the original map, but assume
1183 // it is irrelevant - its creation should no longer be at risk of
1184 // rollback
1185 auto new_map = std::make_pair(
1186 NoVersion,
1187 std::make_shared<ccf::kv::untyped::Map>(
1188 this, name, SecurityDomain::PRIVATE));
1189 maps[name] = new_map;
1190 map = new_map.second;
1191 }
1192 else
1193 {
1194 map = it->second.second;
1195 if (map->get_security_domain() != SecurityDomain::PRIVATE)
1196 {
1197 throw std::logic_error(fmt::format(
1198 "Swap mismatch - map {} is private in source but not in target",
1199 name));
1200 }
1201 }
1202
1203 std::get<1>(*entry) = map.get();
1204 map->lock();
1205 ++entry;
1206 }
1207
1208 for (auto& [name, lhs, rhs] : entries)
1209 {
1210 lhs->swap(rhs);
1211 }
1212
1213 for (auto& [name, lhs, rhs] : entries)
1214 {
1215 lhs->unlock();
1216 rhs->unlock();
1217 }
1218 }
1219
1221 const std::string& map_name, const ccf::kv::untyped::Map::MapHook& hook)
1222 {
1223 map_hooks[map_name] = hook;
1224
1225 const auto it = maps.find(map_name);
1226 if (it != maps.end())
1227 {
1228 it->second.second->set_map_hook(hook);
1229 }
1230 }
1231
1232 void unset_map_hook(const std::string& map_name)
1233 {
1234 map_hooks.erase(map_name);
1235
1236 const auto it = maps.find(map_name);
1237 if (it != maps.end())
1238 {
1239 it->second.second->unset_map_hook();
1240 }
1241 }
1242
1244 const std::string& map_name,
1246 {
1247 global_hooks[map_name] = hook;
1248
1249 const auto it = maps.find(map_name);
1250 if (it != maps.end())
1251 {
1252 it->second.second->set_global_hook(hook);
1253 }
1254 }
1255
1256 void unset_global_hook(const std::string& map_name)
1257 {
1258 global_hooks.erase(map_name);
1259
1260 const auto it = maps.find(map_name);
1261 if (it != maps.end())
1262 {
1263 it->second.second->unset_global_hook();
1264 }
1265 }
1266
1268 {
1269 return {this};
1270 }
1271
1272 std::unique_ptr<ReadOnlyTx> create_read_only_tx_ptr() override
1273 {
1274 return std::make_unique<ReadOnlyTx>(this);
1275 }
1276
1278 {
1279 return {this};
1280 }
1281
1283 {
1284 return {this};
1285 }
1286
1287 std::unique_ptr<CommittableTx> create_tx_ptr()
1288 {
1289 return std::make_unique<CommittableTx>(this);
1290 }
1291
1293 {
1294 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1295 return {this, term_of_last_version, tx_id, rollback_count};
1296 }
1297
1298 void set_flag(StoreFlag f) override
1299 {
1300 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1301 set_flag_unsafe(f);
1302 }
1303
1304 void unset_flag(StoreFlag f) override
1305 {
1306 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1308 }
1309
1310 bool flag_enabled(StoreFlag f) override
1311 {
1312 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1313 return flag_enabled_unsafe(f);
1314 }
1315
1317 {
1318 this->flags |= static_cast<uint8_t>(f);
1319 }
1320
1322 {
1323 this->flags &= ~static_cast<uint8_t>(f);
1324 }
1325
1326 [[nodiscard]] bool flag_enabled_unsafe(StoreFlag f) const override
1327 {
1328 return (flags & static_cast<uint8_t>(f)) != 0;
1329 }
1330 };
1331
1332 using StorePtr = std::shared_ptr<ccf::kv::Store>;
1333}
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:610
StoreFlag
Definition kv_types.h:679
Definition committable_tx.h:19
Definition deserialise.h:19
Definition read_only_store.h:13
Definition tx.h:159
Definition committable_tx.h:372
Definition store.h:26
Term term_of_last_version
Definition store.h:52
std::atomic< Version > compacted
Definition store.h:40
std::atomic< Version > version
Definition store.h:38
ccf::pal::Mutex maps_lock
Definition store.h:34
Version last_committable
Definition store.h:57
Version rollback_count
Definition store.h:59
Version last_new_map
Definition store.h:39
Maps maps
Definition store.h:35
std::atomic< Term > term_of_next_version
Definition store.h:46
void clear()
Definition store.h:65
ccf::pal::Mutex version_lock
Definition store.h:37
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:33
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:62
Version last_replicated
Definition store.h:54
ccf::pal::Mutex commit_lock
Definition store.h:43
Definition store.h:89
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1057
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:392
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1292
ReadOnlyTx create_read_only_tx() override
Definition store.h:1267
void set_flag_unsafe(StoreFlag f) override
Definition store.h:1316
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:250
size_t committable_gap() override
Definition store.h:1121
TxID next_txid() override
Definition store.h:1113
void unset_global_hook(const std::string &map_name)
Definition store.h:1256
void initialise_term(Term t) override
Definition store.h:689
Store(const Store &that)=delete
void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1321
std::shared_ptr< TxHistory > get_history() override
Definition store.h:195
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1220
void unlock_map_set() override
Definition store.h:1082
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:225
void unset_map_hook(const std::string &map_name)
Definition store.h:1232
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:282
Version current_version() override
Definition store.h:869
void swap_private_maps(Store &store)
Definition store.h:1139
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:243
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1093
bool flag_enabled(StoreFlag f) override
Definition store.h:1310
bool operator==(const Store &that) const
Definition store.h:831
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:205
EncryptorPtr get_encryptor() override
Definition store.h:220
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:385
bool check_rollback_count(Version count) override
Definition store.h:1087
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1287
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:190
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:598
void unset_flag(StoreFlag f) override
Definition store.h:1304
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:319
void lock_map_set() override
Definition store.h:1077
Term commit_view() override
Definition store.h:893
void unlock_maps() override
Definition store.h:375
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1272
void lock_maps() override
Definition store.h:365
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:215
CommittableTx create_tx()
Definition store.h:1282
bool should_create_ledger_chunk(Version version) override
Definition store.h:1051
Version next_version() override
Definition store.h:1107
Version compacted_version() override
Definition store.h:888
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:175
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1243
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:881
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:182
void set_flag(StoreFlag f) override
Definition store.h:1298
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:821
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:210
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:200
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:899
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:707
ccf::TxID current_txid() override
Definition store.h:874
void compact(Version v) override
Definition store.h:537
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:256
TxDiff create_tx_diff() override
Definition store.h:1277
bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1326
Definition tx.h:130
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:19
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:506
uint64_t Term
Definition kv_types.h:43
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:206
CommitResult
Definition kv_types.h:209
@ FAIL_NO_REPLICATE
Definition kv_types.h:212
@ SUCCESS
Definition kv_types.h:210
@ PRIVATE
Definition kv_types.h:218
@ PUBLIC
Definition kv_types.h:217
uint64_t Version
Definition version.h:8
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:517
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1332
ApplyResult
Definition kv_types.h:302
@ FAIL
Definition kv_types.h:311
@ PASS
Definition kv_types.h:303
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45