CCF
Loading...
Searching...
No Matches
apply_changes.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/tx.h"
7#include "kv_types.h"
8
9#include <functional>
10#include <map>
11
12namespace ccf::kv
13{
14 // All collections of Map must be ordered so that we lock their contained
15 // maps in a stable order. The order here is by map name
16 using MapCollection = std::map<std::string, std::shared_ptr<AbstractMap>>;
17
19 {
20 virtual ~AbstractChangeContainer() = default;
21 virtual void set_change_list(OrderedChanges&& change_list, Term term) = 0;
22 };
23
24 // Atomically checks for conflicts then applies the writes in the given change
25 // sets to their underlying Maps. Calls f() at most once, iff the writes are
26 // applied, to retrieve a unique Version for the write set and return the max
27 // version which can have a conflict with the transaction.
28
30 using VersionResolver = std::function<std::tuple<Version, VersionLastNewMap>(
31 bool tx_contains_new_map)>;
32
33 static inline std::optional<Version> apply_changes(
34 OrderedChanges& changes,
35 VersionResolver version_resolver_fn,
37 const MapCollection& new_maps,
38 const std::optional<Version>& new_maps_conflict_version,
39 bool track_deletes_on_missing_keys,
40 const std::optional<Version>& expected_rollback_count = std::nullopt)
41 {
42 // All maps with pending writes are locked, transactions are prepared
43 // and possibly committed, and then all maps with pending writes are
44 // unlocked. This is to prevent transactions from being committed in an
45 // interleaved fashion.
46 Version version = NoVersion;
47 bool has_writes = false;
48
49 std::map<std::string, std::unique_ptr<AbstractCommitter>> views;
50 for (const auto& [map_name, mc] : changes)
51 {
52 views[map_name] = mc.map->create_committer(mc.changeset.get());
53 }
54
55 for (auto it = changes.begin(); it != changes.end(); ++it)
56 {
57 has_writes |= it->second.changeset->has_writes();
58 it->second.map->lock();
59 }
60
61 bool ok = true;
62
63 if (expected_rollback_count.has_value() && !changes.empty())
64 {
65 // expected_rollback_count is only set on signature transactions
66 // which always contain some writes, and on which all the maps
67 // point to the same store.
68 auto store = changes.begin()->second.map->get_store();
69 if (store != nullptr)
70 {
71 // Note that this is done when holding the lock on at least some maps
72 // through the combination of the changes not being empty, and the
73 // acquisition of the map locks on line 69. This guarantees atomicity
74 // with respect to rollbacks, which would acquire the map lock on all
75 // maps at once to truncate their roll. The net result is that the
76 // transaction becomes a noop if a rollback occurred between it being
77 // committed, and the side effects being applied.
78 ok = store->check_rollback_count(expected_rollback_count.value());
79 }
80 }
81
82 if (ok && has_writes)
83 {
84 for (auto it = views.begin(); it != views.end(); ++it)
85 {
86 if (!it->second->prepare())
87 {
88 ok = false;
89 break;
90 }
91 }
92 }
93
94 for (const auto& [map_name, map_ptr] : new_maps)
95 {
96 // Check that none of these pending maps have already been created.
97 // It is possible for non-conflicting other transactions to commit here
98 // and increment the version, so we may ask this question at different
99 // versions. This is fine - none can create maps (ie - change their
100 // conflict set with this operation) while we hold the store lock. Assume
101 // that the caller is currently holding store->lock()
102 auto store = map_ptr->get_store();
103
104 // This is to avoid recursively locking version_lock by calling
105 // current_version() in the commit_reserved case.
106 ccf::kv::Version current_v;
107 if (new_maps_conflict_version.has_value())
108 {
109 current_v = *new_maps_conflict_version;
110 }
111 else
112 {
113 current_v = store->current_version();
114 }
115
116 if (store->get_map_unsafe(current_v, map_name) != nullptr)
117 {
118 ok = false;
119 break;
120 }
121 }
122
123 if (ok && has_writes)
124 {
125 // Get the version number to be used for this commit.
126 ccf::kv::Version version_last_new_map;
127 std::tie(version, version_last_new_map) =
128 version_resolver_fn(!new_maps.empty());
129
130 // Transfer ownership of these new maps to their target stores, iff we
131 // have writes to them
132 for (const auto& [map_name, map_ptr] : new_maps)
133 {
134 const auto it = views.find(map_name);
135 if (it != views.end() && it->second->has_writes())
136 {
137 map_ptr->get_store()->add_dynamic_map(version, map_ptr);
138 }
139 }
140
141 for (auto it = views.begin(); it != views.end(); ++it)
142 {
143 it->second->commit(version, track_deletes_on_missing_keys);
144 }
145
146 // Collect ConsensusHooks
147 for (auto it = views.begin(); it != views.end(); ++it)
148 {
149 auto hook_ptr = it->second->post_commit();
150 if (hook_ptr != nullptr)
151 {
152 hooks.push_back(std::move(hook_ptr));
153 }
154 }
155 }
156
157 for (auto it = changes.begin(); it != changes.end(); ++it)
158 {
159 it->second.map->unlock();
160 }
161
162 if (!ok)
163 {
164 return std::nullopt;
165 }
166
167 return version;
168 }
169}
Definition app_interface.h:19
uint64_t Term
Definition kv_types.h:48
uint64_t Version
Definition version.h:8
std::function< std::tuple< Version, VersionLastNewMap >(bool tx_contains_new_map)> VersionResolver
Definition apply_changes.h:31
Version VersionLastNewMap
Definition apply_changes.h:29
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
Definition apply_changes.h:19
virtual ~AbstractChangeContainer()=default
virtual void set_change_list(OrderedChanges &&change_list, Term term)=0