CCF
Loading...
Searching...
No Matches
apply_changes.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/tx.h"
7#include "kv_types.h"
8
9#include <functional>
10#include <map>
11
12namespace ccf::kv
13{
14 // All collections of Map must be ordered so that we lock their contained
15 // maps in a stable order. The order here is by map name
16 using MapCollection = std::map<std::string, std::shared_ptr<AbstractMap>>;
17
18 // Atomically checks for conflicts then applies the writes in the given change
19 // sets to their underlying Maps. Calls f() at most once, iff the writes are
20 // applied, to retrieve a unique Version for the write set and return the max
21 // version which can have a conflict with the transaction.
22
24 using VersionResolver = std::function<std::tuple<Version, VersionLastNewMap>(
25 bool tx_contains_new_map)>;
26
27 static inline std::optional<Version> apply_changes(
28 OrderedChanges& changes,
29 VersionResolver version_resolver_fn,
31 const MapCollection& new_maps,
32 const std::optional<Version>& new_maps_conflict_version,
33 bool track_deletes_on_missing_keys,
34 const std::optional<Version>& expected_rollback_count = std::nullopt)
35 {
36 // All maps with pending writes are locked, transactions are prepared
37 // and possibly committed, and then all maps with pending writes are
38 // unlocked. This is to prevent transactions from being committed in an
39 // interleaved fashion.
40 Version version = NoVersion;
41 bool has_writes = false;
42
43 std::map<std::string, std::unique_ptr<AbstractCommitter>> views;
44 for (const auto& [map_name, mc] : changes)
45 {
46 views[map_name] = mc.map->create_committer(mc.changeset.get());
47 }
48
49 for (auto& [map_name, mc] : changes)
50 {
51 has_writes |= mc.changeset->has_writes();
52 mc.map->lock();
53 }
54
55 bool ok = true;
56
57 if (expected_rollback_count.has_value() && !changes.empty())
58 {
59 // expected_rollback_count is only set on signature transactions
60 // which always contain some writes, and on which all the maps
61 // point to the same store.
62 auto* store = changes.begin()->second.map->get_store();
63 if (store != nullptr)
64 {
65 // Note that this is done when holding the lock on at least some maps
66 // through the combination of the changes not being empty, and the
67 // acquisition of the map locks on line 69. This guarantees atomicity
68 // with respect to rollbacks, which would acquire the map lock on all
69 // maps at once to truncate their roll. The net result is that the
70 // transaction becomes a noop if a rollback occurred between it being
71 // committed, and the side effects being applied.
72 ok = store->check_rollback_count(expected_rollback_count.value());
73 }
74 }
75
76 if (ok && has_writes)
77 {
78 for (auto& [view_name, view_ptr] : views)
79 {
80 if (!view_ptr->prepare())
81 {
82 ok = false;
83 break;
84 }
85 }
86 }
87
88 for (const auto& [map_name, map_ptr] : new_maps)
89 {
90 // Check that none of these pending maps have already been created.
91 // It is possible for non-conflicting other transactions to commit here
92 // and increment the version, so we may ask this question at different
93 // versions. This is fine - none can create maps (ie - change their
94 // conflict set with this operation) while we hold the store lock. Assume
95 // that the caller is currently holding store->lock()
96 auto* store = map_ptr->get_store();
97
98 // This is to avoid recursively locking version_lock by calling
99 // current_version() in the commit_reserved case.
100 ccf::kv::Version current_v = 0;
101 if (new_maps_conflict_version.has_value())
102 {
103 current_v = *new_maps_conflict_version;
104 }
105 else
106 {
107 current_v = store->current_version();
108 }
109
110 if (store->get_map_unsafe(current_v, map_name) != nullptr)
111 {
112 ok = false;
113 break;
114 }
115 }
116
117 if (ok && has_writes)
118 {
119 // Get the version number to be used for this commit.
120 ccf::kv::Version version_last_new_map = 0;
121 std::tie(version, version_last_new_map) =
122 version_resolver_fn(!new_maps.empty());
123
124 // Transfer ownership of these new maps to their target stores, iff we
125 // have writes to them
126 for (const auto& [map_name, map_ptr] : new_maps)
127 {
128 const auto it = views.find(map_name);
129 if (it != views.end() && it->second->has_writes())
130 {
131 map_ptr->get_store()->add_dynamic_map(version, map_ptr);
132 }
133 }
134
135 for (auto& [view_name, view_ptr] : views)
136 {
137 view_ptr->commit(version, track_deletes_on_missing_keys);
138 }
139
140 // Collect ConsensusHooks
141 for (auto& [view_name, view_ptr] : views)
142 {
143 auto hook_ptr = view_ptr->post_commit();
144 if (hook_ptr != nullptr)
145 {
146 hooks.push_back(std::move(hook_ptr));
147 }
148 }
149 }
150
151 for (auto& [map_name, mc] : changes)
152 {
153 mc.map->unlock();
154 }
155
156 if (!ok)
157 {
158 return std::nullopt;
159 }
160
161 return version;
162 }
163}
Definition app_interface.h:19
uint64_t Version
Definition version.h:8
std::function< std::tuple< Version, VersionLastNewMap >(bool tx_contains_new_map)> VersionResolver
Definition apply_changes.h:25
Version VersionLastNewMap
Definition apply_changes.h:23
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22