CCF
Loading...
Searching...
No Matches
committable_tx.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
6#include "ccf/ds/hex.h"
7#include "ccf/tx.h"
8#include "kv/tx_pimpl.h"
9#include "kv_serialiser.h"
10#include "kv_types.h"
11#include "node/rpc/claims.h"
12
13#include <list>
14
15namespace ccf::kv
16{
18 {
19 public:
20 using TxFlags = uint8_t;
21
28
29 protected:
30 bool committed = false;
31 bool success = false;
32
33 Version version = NoVersion;
34
36
39
40 std::vector<uint8_t> serialise(
41 ccf::crypto::Sha256Hash& commit_evidence_digest,
42 std::string& commit_evidence,
43 const ccf::ClaimsDigest& claims_digest_,
44 bool include_reads = false)
45 {
46 if (!committed)
47 throw std::logic_error("Transaction not yet committed");
48
49 if (!success)
50 throw std::logic_error("Transaction aborted");
51
52 if (claims_digest_.empty())
53 throw std::logic_error("Missing claims");
54
55 // If no transactions made changes, return a zero length vector.
56 const bool any_changes =
57 std::any_of(all_changes.begin(), all_changes.end(), [](const auto& it) {
58 return it.second.changeset->has_writes();
59 });
60
61 if (!any_changes)
62 {
63 return {};
64 }
65
66 auto e = pimpl->store->get_encryptor();
67 if (e == nullptr)
68 {
69 throw KvSerialiserException("No encryptor set");
70 }
71
72 auto commit_nonce = e->get_commit_nonce({pimpl->commit_view, version});
73 commit_evidence = fmt::format(
74 "ce:{}.{}:{}",
75 pimpl->commit_view,
76 version,
77 ccf::ds::to_hex(commit_nonce));
78 LOG_TRACE_FMT("Commit evidence: {}", commit_evidence);
79 ccf::crypto::Sha256Hash tx_commit_evidence_digest(commit_evidence);
80 commit_evidence_digest = tx_commit_evidence_digest;
82
84 {
86 }
87
88 KvStoreSerialiser replicated_serialiser(
89 e,
90 {pimpl->commit_view, version},
91 entry_type,
93 tx_commit_evidence_digest,
94 claims_digest_);
95
96 // Process in security domain order
98 {
99 for (const auto& it : all_changes)
100 {
101 const auto& map = it.second.map;
102 const auto& changeset = it.second.changeset;
103 if (map->get_security_domain() == domain && changeset->has_writes())
104 {
105 map->serialise_changes(
106 changeset.get(), replicated_serialiser, include_reads);
107 }
108 }
109 }
110
111 // Return serialised Tx.
112 return replicated_serialiser.get_raw_data();
113 }
114
115 public:
116 CommittableTx(AbstractStore* _store) : Tx(_store) {}
117
132 const ccf::ClaimsDigest& claims = ccf::empty_claims(),
133 std::function<std::tuple<Version, Version>(bool has_new_map)>
134 version_resolver = nullptr,
135 std::function<void(
136 const std::vector<uint8_t>& write_set,
137 const std::string& commit_evidence)> write_set_observer = nullptr)
138 {
139 if (committed)
140 throw std::logic_error("Transaction already committed");
141
142 if (all_changes.empty())
143 {
144 committed = true;
145 success = true;
147 }
148
149 // If this transaction creates any maps, ensure that commit gets a
150 // consistent snapshot of the existing map set
151 const bool maps_created = !pimpl->created_maps.empty();
152 if (maps_created)
153 {
154 this->pimpl->store->lock_map_set();
155 }
156
158
159 std::optional<Version> new_maps_conflict_version = std::nullopt;
160
161 bool track_deletes_on_missing_keys = false;
162 auto c = apply_changes(
164 version_resolver == nullptr ?
165 [&](bool has_new_map) {
166 return pimpl->store->next_version(has_new_map);
167 } :
168 version_resolver,
169 hooks,
170 pimpl->created_maps,
171 new_maps_conflict_version,
172 track_deletes_on_missing_keys);
173
174 if (maps_created)
175 {
176 this->pimpl->store->unlock_map_set();
177 }
178
179 success = c.has_value();
180
181 if (!success)
182 {
183 // This Tx is now in a dead state. Caller should create a new Tx and try
184 // again.
185 LOG_TRACE_FMT("Could not commit transaction due to conflict");
187 }
188 else
189 {
190 committed = true;
191 version = c.value();
192
194 {
195 auto chunker = pimpl->store->get_chunker();
196 if (chunker)
197 {
198 chunker->force_end_of_chunk(version);
199 }
200 }
201
203 {
204 pimpl->store->set_flag(
207 }
208
209 if (version == NoVersion)
210 {
211 // Read-only transaction
213 }
214
215 // From here, we have received a unique commit version and made
216 // modifications to our local kv. If we fail in any way, we cannot
217 // recover.
218 try
219 {
220 ccf::crypto::Sha256Hash commit_evidence_digest;
221 std::string commit_evidence;
222 auto data =
223 serialise(commit_evidence_digest, commit_evidence, claims);
224
225 if (data.empty())
226 {
228 }
229
230 if (write_set_observer != nullptr)
231 {
232 write_set_observer(data, commit_evidence);
233 }
234
235 auto claims_ = claims;
236
237 return pimpl->store->commit(
238 {pimpl->commit_view, version},
239 std::make_unique<MovePendingTx>(
240 std::move(data),
241 std::move(claims_),
242 std::move(commit_evidence_digest),
243 std::move(hooks)),
244 false);
245 }
246 catch (const std::exception& e)
247 {
248 committed = false;
249
250 LOG_FAIL_FMT("Error during serialisation");
251 LOG_DEBUG_FMT("Error during serialisation: {}", e.what());
252
253 // Discard original exception type, throw as now fatal
254 // KvSerialiserException
255 throw KvSerialiserException(e.what());
256 }
257 }
258 }
259
268 {
269 if (!committed)
270 throw std::logic_error("Transaction not yet committed");
271
272 if (!success)
273 throw std::logic_error("Transaction aborted");
274
275 return version;
276 }
277
286 {
287 if (!committed)
288 throw std::logic_error("Transaction not yet committed");
289
290 if (!success)
291 throw std::logic_error("Transaction aborted");
292
293 return pimpl->commit_view;
294 }
295
301 {
302 return version;
303 }
304
305 std::optional<TxID> get_txid()
306 {
307 if (!committed)
308 {
309 throw std::logic_error("Transaction not yet committed");
310 }
311
312 if (!pimpl->read_txid.has_value())
313 {
314 // Transaction did not get a handle on any map.
315 return std::nullopt;
316 }
317
318 // A committed tx is read-only (i.e. no write to any map) if it was not
319 // assigned a version when it was committed
320 if (version == NoVersion)
321 {
322 // Read-only transaction
323 return pimpl->read_txid.value();
324 }
325 else
326 {
327 // Write transaction
328 return TxID(pimpl->commit_view, version);
329 }
330 }
331
332 void set_change_list(OrderedChanges&& change_list_, Term term_) override
333 {
334 // if all_changes is not empty then any coinciding keys will not be
335 // overwritten
336 all_changes.merge(change_list_);
337 pimpl->commit_view = term_;
338 }
339
340 void set_view(ccf::View view_)
341 {
342 pimpl->commit_view = view_;
343 }
344
346 {
347 req_id = req_id_;
348 }
349
351 {
352 return req_id;
353 }
354
355 void set_read_txid(const TxID& tx_id, Term commit_view_)
356 {
357 if (pimpl->read_txid.has_value())
358 {
359 throw std::logic_error("Read TxID already set");
360 }
361 pimpl->read_txid = tx_id;
362 pimpl->commit_view = commit_view_;
363 }
364
369
370 virtual void set_tx_flag(TxFlag flag)
371 {
372 flags |= static_cast<TxFlags>(flag);
373 }
374
375 virtual void unset_tx_flag(TxFlag flag)
376 {
377 flags &= ~static_cast<TxFlags>(flag);
378 }
379
380 virtual bool tx_flag_enabled(TxFlag f) const
381 {
382 return (flags & static_cast<TxFlags>(f)) != 0;
383 }
384 };
385
386 // Used by frontend for reserved transactions. These are constructed with a
387 // pre-reserved Version, and _must succeed_ to fulfil this version. Otherwise
388 // they create a hole in the transaction order, and no future transactions can
389 // complete. These transactions are used internally by CCF for the sole
390 // purpose of recording node signatures and are safe in this particular
391 // situation because they never perform any reads and therefore can
392 // never conflict.
394 {
395 private:
396 Version rollback_count = 0;
397
398 public:
400 AbstractStore* _store,
401 Term read_term,
402 const TxID& reserved_tx_id,
403 Version rollback_count_) :
404 CommittableTx(_store)
405 {
406 version = reserved_tx_id.version;
407 pimpl->commit_view = reserved_tx_id.term;
408 pimpl->read_txid = TxID(read_term, reserved_tx_id.version - 1);
409 rollback_count = rollback_count_;
410 }
411
412 // Used by frontend to commit reserved transactions
414 {
415 if (committed)
416 throw std::logic_error("Transaction already committed");
417
418 if (all_changes.empty())
419 throw std::logic_error("Reserved transaction cannot be empty");
420
421 std::vector<ConsensusHookPtr> hooks;
422 bool track_deletes_on_missing_keys = false;
423 auto c = apply_changes(
425 [this](bool) { return std::make_tuple(version, version - 1); },
426 hooks,
427 pimpl->created_maps,
428 version,
429 track_deletes_on_missing_keys,
430 rollback_count);
431 success = c.has_value();
432
433 if (!success)
434 throw std::logic_error("Failed to commit reserved transaction");
435
436 ccf::crypto::Sha256Hash commit_evidence_digest;
437 std::string commit_evidence;
438
439 // This is a signature and, if the ledger chunking or snapshot flags are
440 // enabled, we want the host to create a chunk when it sees this entry.
441 // version_lock held by Store::commit
442 if (pimpl->store->should_create_ledger_chunk_unsafe(version))
443 {
446 "Ending ledger chunk with signature at {}.{}",
447 pimpl->commit_view,
448 version);
449
450 auto chunker = pimpl->store->get_chunker();
451 if (chunker)
452 {
453 chunker->produced_chunk_at(version);
454 }
455 }
456
457 committed = true;
458 auto claims = ccf::empty_claims();
459 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
460
461 return {
463 std::move(data),
464 ccf::empty_claims(),
465 std::move(commit_evidence_digest),
466 std::move(hooks)};
467 }
468 };
469}
Definition claims_digest.h:10
bool empty() const
Definition claims_digest.h:33
Definition sha256_hash.h:16
Definition kv_types.h:680
OrderedChanges all_changes
Definition tx.h:51
std::optional< ccf::crypto::Sha256Hash > root_at_read_version
Definition tx.h:53
std::unique_ptr< PrivateImpl > pimpl
Definition tx.h:49
Definition committable_tx.h:18
SerialisedEntryFlags entry_flags
Definition committable_tx.h:38
Version get_version()
Definition committable_tx.h:300
std::vector< uint8_t > serialise(ccf::crypto::Sha256Hash &commit_evidence_digest, std::string &commit_evidence, const ccf::ClaimsDigest &claims_digest_, bool include_reads=false)
Definition committable_tx.h:40
bool success
Definition committable_tx.h:31
std::optional< TxID > get_txid()
Definition committable_tx.h:305
bool committed
Definition committable_tx.h:30
Version version
Definition committable_tx.h:33
virtual void unset_tx_flag(TxFlag flag)
Definition committable_tx.h:375
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:355
TxFlag
Definition committable_tx.h:23
virtual bool tx_flag_enabled(TxFlag f) const
Definition committable_tx.h:380
const ccf::kv::TxHistory::RequestID & get_req_id()
Definition committable_tx.h:350
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:131
uint8_t TxFlags
Definition committable_tx.h:20
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:365
void set_view(ccf::View view_)
Definition committable_tx.h:340
CommittableTx(AbstractStore *_store)
Definition committable_tx.h:116
TxFlags flags
Definition committable_tx.h:37
ccf::kv::TxHistory::RequestID req_id
Definition committable_tx.h:35
Version commit_version()
Definition committable_tx.h:267
void set_change_list(OrderedChanges &&change_list_, Term term_) override
Definition committable_tx.h:332
Version commit_term()
Definition committable_tx.h:285
void set_req_id(const ccf::kv::TxHistory::RequestID &req_id_)
Definition committable_tx.h:345
virtual void set_tx_flag(TxFlag flag)
Definition committable_tx.h:370
Definition generic_serialise_wrapper.h:20
Definition kv_types.h:354
Definition committable_tx.h:394
ReservedTx(AbstractStore *_store, Term read_term, const TxID &reserved_tx_id, Version rollback_count_)
Definition committable_tx.h:399
PendingTxInfo commit_reserved()
Definition committable_tx.h:413
std::tuple< size_t, size_t > RequestID
Definition kv_types.h:372
Definition tx.h:201
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition app_interface.h:19
uint64_t Term
Definition kv_types.h:48
@ PRIVATE
Definition kv_types.h:257
@ PUBLIC
Definition kv_types.h:256
@ WriteSetWithCommitEvidenceAndClaims
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:248
@ SUCCESS
Definition kv_types.h:249
@ FAIL_CONFLICT
Definition kv_types.h:250
uint8_t SerialisedEntryFlags
Definition serialised_entry_format.h:12
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
uint64_t View
Definition tx_id.h:23
Definition map_serializers.h:11
Definition apply_changes.h:19
Definition kv_types.h:487
Definition kv_types.h:52
Version version
Definition kv_types.h:54
Term term
Definition kv_types.h:53