CCF
Loading...
Searching...
No Matches
committable_tx.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
6#include "ccf/ds/hex.h"
7#include "ccf/tx.h"
9#include "kv/tx_pimpl.h"
10#include "kv_serialiser.h"
11#include "kv_types.h"
12#include "node/rpc/claims.h"
13
14#include <list>
15
16namespace ccf::kv
17{
18 class CommittableTx : public Tx
19 {
20 public:
21 using TxFlags = uint8_t;
22
29
30 protected:
31 bool committed = false;
32 bool success = false;
33
34 Version version = NoVersion;
35
38
39 std::vector<uint8_t> serialise(
40 ccf::crypto::Sha256Hash& commit_evidence_digest,
41 std::string& commit_evidence,
42 const ccf::ClaimsDigest& claims_digest_,
43 bool include_reads = false)
44 {
45 if (!committed)
46 {
47 throw std::logic_error("Transaction not yet committed");
48 }
49
50 if (!success)
51 {
52 throw std::logic_error("Transaction aborted");
53 }
54
55 if (claims_digest_.empty())
56 {
57 throw std::logic_error("Missing claims");
58 }
59
60 // If no transactions made changes, return a zero length vector.
61 const bool any_changes =
62 std::any_of(all_changes.begin(), all_changes.end(), [](const auto& it) {
63 return it.second.changeset->has_writes();
64 });
65
66 if (!any_changes)
67 {
68 return {};
69 }
70
71 auto e = pimpl->store->get_encryptor();
72 if (e == nullptr)
73 {
74 throw KvSerialiserException("No encryptor set");
75 }
76
77 auto commit_nonce = e->get_commit_nonce({pimpl->commit_view, version});
78 commit_evidence = fmt::format(
79 "ce:{}.{}:{}",
80 pimpl->commit_view,
81 version,
82 ccf::ds::to_hex(commit_nonce));
83 LOG_TRACE_FMT("Commit evidence: {}", commit_evidence);
84 ccf::crypto::Sha256Hash tx_commit_evidence_digest(commit_evidence);
85 commit_evidence_digest = tx_commit_evidence_digest;
87
89 {
91 }
92
93 KvStoreSerialiser replicated_serialiser(
94 e,
95 {pimpl->commit_view, version},
96 entry_type,
98 tx_commit_evidence_digest,
99 claims_digest_);
100
101 // Process in security domain order
103 {
104 for (const auto& it : all_changes)
105 {
106 const auto& map = it.second.map;
107 const auto& changeset = it.second.changeset;
108 if (map->get_security_domain() == domain && changeset->has_writes())
109 {
110 map->serialise_changes(
111 changeset.get(), replicated_serialiser, include_reads);
112 }
113 }
114 }
115
116 // Return serialised Tx.
117 return replicated_serialiser.get_raw_data();
118 }
119
120 public:
121 CommittableTx(AbstractStore* _store) : Tx(_store) {}
122
137 const ccf::ClaimsDigest& claims = ccf::empty_claims(),
138 std::function<std::tuple<Version, Version>(bool has_new_map)>
139 version_resolver = nullptr,
140 std::function<void(
141 const std::vector<uint8_t>& write_set,
142 const std::string& commit_evidence)> write_set_observer = nullptr)
143 {
144 if (committed)
145 {
146 throw std::logic_error("Transaction already committed");
147 }
148
149 if (all_changes.empty())
150 {
151 committed = true;
152 success = true;
154 }
155
156 // If this transaction creates any maps, ensure that commit gets a
157 // consistent snapshot of the existing map set
158 const bool maps_created = !pimpl->created_maps.empty();
159 if (maps_created)
160 {
161 this->pimpl->store->lock_map_set();
162 }
163
165
166 std::optional<Version> new_maps_conflict_version = std::nullopt;
167
168 bool track_deletes_on_missing_keys = false;
169 auto c = apply_changes(
171 version_resolver == nullptr ?
172 [&](bool has_new_map) {
173 return pimpl->store->next_version(has_new_map);
174 } :
175 version_resolver,
176 hooks,
177 pimpl->created_maps,
178 new_maps_conflict_version,
179 track_deletes_on_missing_keys);
180
181 if (maps_created)
182 {
183 this->pimpl->store->unlock_map_set();
184 }
185
186 success = c.has_value();
187
188 if (!success)
189 {
190 // This Tx is now in a dead state. Caller should create a new Tx and try
191 // again.
192 LOG_TRACE_FMT("Could not commit transaction due to conflict");
194 }
195
196 committed = true;
197 version = c.value();
198
200 {
201 auto chunker = pimpl->store->get_chunker();
202 if (chunker)
203 {
204 chunker->force_end_of_chunk(version);
205 }
206 }
207
209 {
210 pimpl->store->set_flag(
213 }
214
215 if (version == NoVersion)
216 {
217 // Read-only transaction
219 }
220
221 // From here, we have received a unique commit version and made
222 // modifications to our local kv. If we fail in any way, we cannot
223 // recover.
224 try
225 {
226 ccf::crypto::Sha256Hash commit_evidence_digest;
227 std::string commit_evidence;
228 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
229
230 if (data.empty())
231 {
233 }
234
235 if (write_set_observer != nullptr)
236 {
237 write_set_observer(data, commit_evidence);
238 }
239
240 auto claims_ = claims;
241
242 return pimpl->store->commit(
243 {pimpl->commit_view, version},
244 std::make_unique<MovePendingTx>(
245 std::move(data),
246 std::move(claims_),
247 std::move(commit_evidence_digest),
248 std::move(hooks)),
249 false);
250 }
251 catch (const std::exception& e)
252 {
253 committed = false;
254
255 LOG_FAIL_FMT("Error during serialisation");
256 LOG_DEBUG_FMT("Error during serialisation: {}", e.what());
257
258 // Discard original exception type, throw as now fatal
259 // KvSerialiserException
260 throw KvSerialiserException(e.what());
261 }
262 }
263
271 [[nodiscard]] Version commit_version() const
272 {
273 if (!committed)
274 {
275 throw std::logic_error("Transaction not yet committed");
276 }
277
278 if (!success)
279 {
280 throw std::logic_error("Transaction aborted");
281 }
282
283 return version;
284 }
285
293 [[nodiscard]] Version commit_term() const
294 {
295 if (!committed)
296 {
297 throw std::logic_error("Transaction not yet committed");
298 }
299
300 if (!success)
301 {
302 throw std::logic_error("Transaction aborted");
303 }
304
305 return pimpl->commit_view;
306 }
307
308 [[nodiscard]] std::optional<TxID> get_txid() const
309 {
310 if (!committed)
311 {
312 throw std::logic_error("Transaction not yet committed");
313 }
314
315 if (!pimpl->read_txid.has_value())
316 {
317 // Transaction did not get a handle on any map.
318 return std::nullopt;
319 }
320
321 // A committed tx is read-only (i.e. no write to any map) if it was not
322 // assigned a version when it was committed
323 if (version == NoVersion)
324 {
325 // Read-only transaction
326 return pimpl->read_txid;
327 }
328
329 // Write transaction
330 return TxID(pimpl->commit_view, version);
331 }
332
333 void set_read_txid(const TxID& tx_id, Term commit_view_)
334 {
335 if (pimpl->read_txid.has_value())
336 {
337 throw std::logic_error("Read TxID already set");
338 }
339 pimpl->read_txid = tx_id;
340 pimpl->commit_view = commit_view_;
341 }
342
347
348 virtual void set_tx_flag(TxFlag flag)
349 {
350 flags |= static_cast<TxFlags>(flag);
351 }
352
353 virtual void unset_tx_flag(TxFlag flag)
354 {
355 flags &= ~static_cast<TxFlags>(flag);
356 }
357
358 [[nodiscard]] virtual bool tx_flag_enabled(TxFlag f) const
359 {
360 return (flags & static_cast<TxFlags>(f)) != 0;
361 }
362 };
363
364 // Used by frontend for reserved transactions. These are constructed with a
365 // pre-reserved Version, and _must succeed_ to fulfil this version. Otherwise
366 // they create a hole in the transaction order, and no future transactions can
367 // complete. These transactions are used internally by CCF for the sole
368 // purpose of recording node signatures and are safe in this particular
369 // situation because they never perform any reads and therefore can
370 // never conflict.
372 {
373 private:
374 Version rollback_count = 0;
375
376 public:
378 AbstractStore* _store,
379 Term read_term,
380 const TxID& reserved_tx_id,
381 Version rollback_count_) :
382 CommittableTx(_store),
383 rollback_count(rollback_count_)
384 {
385 version = reserved_tx_id.seqno;
386 pimpl->commit_view = reserved_tx_id.view;
387 pimpl->read_txid = TxID(read_term, reserved_tx_id.seqno - 1);
388 }
389
390 // Used by frontend to commit reserved transactions
392 {
393 if (committed)
394 {
395 throw std::logic_error("Transaction already committed");
396 }
397
398 if (all_changes.empty())
399 {
400 throw std::logic_error("Reserved transaction cannot be empty");
401 }
402
403 std::vector<ConsensusHookPtr> hooks;
404 bool track_deletes_on_missing_keys = false;
405 auto c = apply_changes(
407 [this](bool) { return std::make_tuple(version, version - 1); },
408 hooks,
409 pimpl->created_maps,
410 version,
411 track_deletes_on_missing_keys,
412 rollback_count);
413 success = c.has_value();
414
415 if (!success)
416 {
417 throw std::logic_error("Failed to commit reserved transaction");
418 }
419
420 ccf::crypto::Sha256Hash commit_evidence_digest;
421 std::string commit_evidence;
422
423 // This is a signature and, if the ledger chunking or snapshot flags are
424 // enabled, we want the host to create a chunk when it sees this entry.
425 // version_lock held by Store::commit
426 if (pimpl->store->should_create_ledger_chunk_unsafe(version))
427 {
430 "Ending ledger chunk with signature at {}.{}",
431 pimpl->commit_view,
432 version);
433
434 auto chunker = pimpl->store->get_chunker();
435 if (chunker)
436 {
437 chunker->produced_chunk_at(version);
438 }
439 }
440
441 committed = true;
442 auto claims = ccf::empty_claims();
443 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
444
445 return {
447 std::move(data),
448 ccf::empty_claims(),
449 std::move(commit_evidence_digest),
450 std::move(hooks)};
451 }
452 };
453}
Definition claims_digest.h:10
bool empty() const
Definition claims_digest.h:33
Definition sha256_hash.h:16
Definition kv_types.h:610
OrderedChanges all_changes
Definition tx.h:51
std::optional< ccf::crypto::Sha256Hash > root_at_read_version
Definition tx.h:53
std::unique_ptr< PrivateImpl > pimpl
Definition tx.h:49
Definition committable_tx.h:19
SerialisedEntryFlags entry_flags
Definition committable_tx.h:37
std::vector< uint8_t > serialise(ccf::crypto::Sha256Hash &commit_evidence_digest, std::string &commit_evidence, const ccf::ClaimsDigest &claims_digest_, bool include_reads=false)
Definition committable_tx.h:39
bool success
Definition committable_tx.h:32
bool committed
Definition committable_tx.h:31
Version version
Definition committable_tx.h:34
virtual void unset_tx_flag(TxFlag flag)
Definition committable_tx.h:353
Version commit_term() const
Definition committable_tx.h:293
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:333
TxFlag
Definition committable_tx.h:24
virtual bool tx_flag_enabled(TxFlag f) const
Definition committable_tx.h:358
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:136
Version commit_version() const
Definition committable_tx.h:271
uint8_t TxFlags
Definition committable_tx.h:21
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:343
CommittableTx(AbstractStore *_store)
Definition committable_tx.h:121
TxFlags flags
Definition committable_tx.h:36
std::optional< TxID > get_txid() const
Definition committable_tx.h:308
virtual void set_tx_flag(TxFlag flag)
Definition committable_tx.h:348
Definition generic_serialise_wrapper.h:20
Definition kv_types.h:315
Definition committable_tx.h:372
ReservedTx(AbstractStore *_store, Term read_term, const TxID &reserved_tx_id, Version rollback_count_)
Definition committable_tx.h:377
PendingTxInfo commit_reserved()
Definition committable_tx.h:391
Definition tx.h:200
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:19
uint64_t Term
Definition kv_types.h:43
@ WriteSetWithCommitEvidenceAndClaims
CommitResult
Definition kv_types.h:209
@ SUCCESS
Definition kv_types.h:210
@ FAIL_CONFLICT
Definition kv_types.h:211
@ PRIVATE
Definition kv_types.h:218
@ PUBLIC
Definition kv_types.h:217
uint64_t Version
Definition version.h:8
uint8_t SerialisedEntryFlags
Definition serialised_entry_format.h:12
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
Definition map_serializers.h:11
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition kv_types.h:417