CCF
Loading...
Searching...
No Matches
historical_queries.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/pal/locking.h"
8#include "ds/ccf_assert.h"
9#include "kv/store.h"
10#include "node/encryptor.h"
11#include "node/history.h"
12#include "node/ledger_secrets.h"
16
17#include <list>
18#include <map>
19#include <memory>
20#include <set>
21
22#ifdef ENABLE_HISTORICAL_VERBOSE_LOGGING
23# define HISTORICAL_LOG(...) LOG_INFO_FMT(__VA_ARGS__)
24#else
25# define HISTORICAL_LOG(...)
26#endif
27
28namespace ccf::historical
29{
30 enum class RequestNamespace : uint8_t
31 {
33 System,
34 };
35
36 using CompoundHandle = std::pair<RequestNamespace, RequestHandle>;
37
38};
39
40FMT_BEGIN_NAMESPACE
41template <>
43{
44 template <typename ParseContext>
45 constexpr auto parse(ParseContext& ctx)
46 {
47 return ctx.begin();
48 }
49
50 template <typename FormatContext>
51 auto format(
52 const ccf::historical::CompoundHandle& p, FormatContext& ctx) const
53 {
54 return format_to(
55 ctx.out(),
56 "[{}|{}]",
57 std::get<0>(p) == ccf::historical::RequestNamespace::Application ? "APP" :
58 "SYS",
59 std::get<1>(p));
60 }
61};
62FMT_END_NAMESPACE
63
64namespace ccf::historical
65{
66 static constexpr auto slow_fetch_threshold = std::chrono::milliseconds(1000);
67 static constexpr size_t soft_to_raw_ratio{5};
68
69 static std::optional<ccf::PrimarySignature> get_signature(
70 const ccf::kv::StorePtr& sig_store)
71 {
72 auto tx = sig_store->create_read_only_tx();
73 auto signatures = tx.ro<ccf::Signatures>(ccf::Tables::SIGNATURES);
74 return signatures->get();
75 }
76
77 static std::optional<ccf::CoseSignature> get_cose_signature(
78 const ccf::kv::StorePtr& sig_store)
79 {
80 auto tx = sig_store->create_read_only_tx();
81 auto signatures = tx.ro<ccf::CoseSignatures>(ccf::Tables::COSE_SIGNATURES);
82 return signatures->get();
83 }
84
85 static std::optional<std::vector<uint8_t>> get_tree(
86 const ccf::kv::StorePtr& sig_store)
87 {
88 auto tx = sig_store->create_read_only_tx();
89 auto tree =
90 tx.ro<ccf::SerialisedMerkleTree>(ccf::Tables::SERIALISED_MERKLE_TREE);
91 return tree->get();
92 }
93
95 {
96 protected:
98 std::shared_ptr<ccf::LedgerSecrets> source_ledger_secrets;
100
101 std::shared_ptr<ccf::LedgerSecrets> historical_ledger_secrets;
102 std::shared_ptr<ccf::NodeEncryptor> historical_encryptor;
103
104 // whether to keep all the writes so that we can build a diff later
106
107 enum class StoreStage
108 {
109 Fetching,
110 Trusted,
111 };
112
113 using LedgerEntry = std::vector<uint8_t>;
114
116 {
117 if (earliest_secret_.secret == nullptr)
118 {
119 // Haven't worked out earliest known secret yet - work it out now
120 if (historical_ledger_secrets->is_empty())
121 {
123 !source_ledger_secrets->is_empty(),
124 "Source ledger secrets are empty");
126 }
127 else
128 {
131 historical_ledger_secrets->get_latest(tx).first <
132 source_ledger_secrets->get_first().first,
133 "Historical ledger secrets are not older than main ledger secrets");
134
136 }
137 }
138 }
139
141 {
142 std::chrono::milliseconds time_until_fetch = {};
147 bool is_signature = false;
151
153 {
154 if (store != nullptr)
155 {
156 auto e = store->get_encryptor();
157 return e->get_commit_nonce(
159 }
160 else
161 {
162 throw std::logic_error("Store pointer not set");
163 }
164 }
165
166 std::optional<std::string> get_commit_evidence()
167 {
169 {
170 return fmt::format(
171 "ce:{}.{}:{}",
174 ds::to_hex(get_commit_nonce()));
175 }
176 else
177 {
178 return std::nullopt;
179 }
180 }
181 };
182 using StoreDetailsPtr = std::shared_ptr<StoreDetails>;
183 using RequestedStores = std::map<ccf::SeqNo, StoreDetailsPtr>;
184
185 using WeakStoreDetailsPtr = std::weak_ptr<StoreDetails>;
186 using AllRequestedStores = std::map<ccf::SeqNo, WeakStoreDetailsPtr>;
187
189 {
192
193 VersionedSecret() = default;
194 // NB: Can't use VersionedLedgerSecret directly because the first element
195 // is const, so the whole thing is non-copyable
197 valid_from(vls.first),
198 secret(vls.second)
199 {}
200 };
201
204
205 struct Request
206 {
208
210 std::chrono::milliseconds time_to_expiry;
211
212 bool include_receipts = false;
213
214 // Entries from outside the requested range (such as the next signature)
215 // may be needed to produce receipts. They are stored here, distinct from
216 // user-requested stores.
218
219 // Only set when recovering ledger secrets
220 std::optional<ccf::SeqNo> awaiting_ledger_secrets = std::nullopt;
221
222 Request(AllRequestedStores& all_stores_) : all_stores(all_stores_) {}
223
225 {
226 auto it = all_stores.find(seqno);
227 if (it != all_stores.end())
228 {
229 return it->second.lock();
230 }
231
232 return nullptr;
233 }
234
236 {
237 if (!my_stores.empty())
238 {
239 return my_stores.begin()->first;
240 }
241
242 return {};
243 }
244
245 std::pair<std::vector<SeqNo>, std::vector<SeqNo>> adjust_ranges(
246 const SeqNoCollection& new_seqnos,
247 bool should_include_receipts,
248 SeqNo earliest_ledger_secret_seqno)
249 {
250 std::vector<SeqNo> removed{}, added{};
251
252 bool any_diff = false;
253
254 // If a seqno is earlier than the earliest known ledger secret, we will
255 // store that it was requested with a nullptr in `my_stores`, but not
256 // add it to `all_stores` to begin fetching until a sufficiently early
257 // secret has been retrieved. To avoid awkwardly sharding requests (and
258 // delaying the secret-fetch with a large request for a later range), we
259 // extend that to say that if _any_ seqno is too early, then _all_
260 // subsequent seqnos will be pending. This bool tracks that behaviour.
261 bool any_too_early = false;
262
263 {
264 auto prev_it = my_stores.begin();
265 auto new_it = new_seqnos.begin();
266 while (new_it != new_seqnos.end())
267 {
268 if (prev_it != my_stores.end() && *new_it == prev_it->first)
269 {
270 // Asking for a seqno which was also requested previously - do
271 // nothing and advance to compare next entries
272 ++new_it;
273 ++prev_it;
274 }
275 else if (prev_it != my_stores.end() && *new_it > prev_it->first)
276 {
277 // No longer looking for a seqno which was previously requested.
278 // Remove it from my_stores
279 removed.push_back(prev_it->first);
280 prev_it = my_stores.erase(prev_it);
281 any_diff |= true;
282 }
283 else
284 {
285 // *new_it < prev_it->first
286 // Asking for a seqno which was not previously being fetched =>
287 // check if another request was fetching it, else create new
288 // details to track it
289 if (*new_it < earliest_ledger_secret_seqno || any_too_early)
290 {
291 // If this is too early for known secrets, just record that it
292 // was requested but don't add it to all_stores yet
293 added.push_back(*new_it);
294 prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr);
295 any_too_early = true;
296 }
297 else
298 {
299 auto all_it = all_stores.find(*new_it);
300 auto details =
301 all_it == all_stores.end() ? nullptr : all_it->second.lock();
302 if (details == nullptr)
303 {
304 HISTORICAL_LOG("{} is newly requested", *new_it);
305 details = std::make_shared<StoreDetails>();
306 all_stores.insert_or_assign(all_it, *new_it, details);
307 }
308 added.push_back(*new_it);
309 prev_it = my_stores.insert_or_assign(prev_it, *new_it, details);
310 }
311 any_diff |= true;
312 }
313 }
314
315 if (prev_it != my_stores.end())
316 {
317 // If we have a suffix of seqnos previously requested, now
318 // unrequested, purge them
319 my_stores.erase(prev_it, my_stores.end());
320 any_diff |= true;
321 }
322 }
323
324 if (!any_diff && (should_include_receipts == include_receipts))
325 {
326 HISTORICAL_LOG("Identical to previous request");
327 return {removed, added};
328 }
329
330 include_receipts = should_include_receipts;
331
333 "Clearing {} supporting signatures", supporting_signatures.size());
334 supporting_signatures.clear();
335 if (should_include_receipts)
336 {
337 // If requesting signatures, populate receipts for each entry that we
338 // already have. Normally this would be done when each entry was
339 // received, but in the case that we have the entries already and only
340 // request signatures now, we delay that work to now.
341
342 for (auto seqno : new_seqnos)
343 {
345 }
346 }
347 return {removed, added};
348 }
349
351 {
353 "Looking at {}, and populating receipts from it", new_seqno);
354 auto new_details = get_store_details(new_seqno);
355 if (new_details != nullptr && new_details->store != nullptr)
356 {
357 if (new_details->is_signature)
358 {
359 HISTORICAL_LOG("{} is a signature", new_seqno);
360
361 fill_receipts_from_signature(new_details);
362 }
363 else
364 {
365 // This isn't a signature. To find the signature for this, we look
366 // through every subsequent transaction, until we find either a gap
367 // (a seqno that hasn't been fetched yet), or a signature. If it is
368 // a signature, and we've found a contiguous range of seqnos to it,
369 // then it must be a signature over this seqno. Else we find a gap
370 // first, and fetch it in case it is the signature. It's possible
371 // that we already have the later signature, and wastefully fill in
372 // the gaps, but this reduces the cases we have to consider so makes
373 // the code much simpler.
374
375 HISTORICAL_LOG("{} is not a signature", new_seqno);
376 supporting_signatures.erase(new_seqno);
377
378 auto next_seqno = new_seqno + 1;
379 while (true)
380 {
381 auto all_it = all_stores.find(next_seqno);
382 auto details =
383 all_it == all_stores.end() ? nullptr : all_it->second.lock();
384 if (details == nullptr)
385 {
387 "Looking for new supporting signature at {}", next_seqno);
388 details = std::make_shared<StoreDetails>();
389 all_stores.insert_or_assign(all_it, next_seqno, details);
390 }
391
392 if (details->store == nullptr)
393 {
394 // Whether we just started fetching or someone else was already
395 // looking for this, it's the first gap we've found so _may_ be
396 // our signature
398 "Assigning {} as potential signature for {}",
399 next_seqno,
400 new_seqno);
401 supporting_signatures[next_seqno] = details;
402 return;
403 }
404 else if (details->is_signature)
405 {
406 const auto filled_this =
407 fill_receipts_from_signature(details, new_seqno);
408
409 if (
410 !filled_this && my_stores.find(new_seqno) != my_stores.end())
411 {
412 throw std::logic_error(fmt::format(
413 "Unexpected: Found a signature at {}, and contiguous range "
414 "of transactions from {}, yet signature does not cover "
415 "this seqno!",
416 next_seqno,
417 new_seqno));
418 }
419
420 return;
421 }
422 else
423 {
424 // This is a normal transaction, and its already fetched.
425 // Nothing to do, consider the next.
426 ++next_seqno;
427 }
428 }
429 }
430 }
431 }
432
433 private:
434 bool fill_receipts_from_signature(
435 const std::shared_ptr<StoreDetails>& sig_details,
436 std::optional<ccf::SeqNo> should_fill = std::nullopt)
437 {
438 // Iterate through earlier indices. If this signature covers them
439 // then create a receipt for them
440 const auto sig = get_signature(sig_details->store);
441 const auto cose_sig = get_cose_signature(sig_details->store);
442 ccf::MerkleTreeHistory tree(get_tree(sig_details->store).value());
443
444 // This is either pointing at the sig itself, or the closest larger
445 // seqno we're holding
446 auto sig_lower_bound_it =
447 my_stores.lower_bound(sig_details->transaction_id.seqno);
448
449 if (sig_lower_bound_it != my_stores.begin()) // Skip empty map edge case
450 {
451 // Construct reverse iterator to search backwards from here
452 auto search_rit = std::reverse_iterator(sig_lower_bound_it);
453 while (search_rit != my_stores.rend())
454 {
455 auto seqno = search_rit->first;
456 if (tree.in_range(seqno))
457 {
458 auto details = search_rit->second;
459 if (details != nullptr && details->store != nullptr)
460 {
461 auto proof = tree.get_proof(seqno);
462 details->transaction_id = {sig->view, seqno};
463 details->receipt = std::make_shared<TxReceiptImpl>(
464 sig->sig,
465 cose_sig,
466 proof.get_root(),
467 proof.get_path(),
468 sig->node,
469 sig->cert,
470 details->entry_digest,
471 details->get_commit_evidence(),
472 details->claims_digest);
474 "Assigned a receipt for {} after given signature at {}",
475 seqno,
476 sig_details->transaction_id.to_str());
477
478 if (should_fill.has_value() && seqno == *should_fill)
479 {
480 should_fill.reset();
481 }
482 }
483
484 ++search_rit;
485 }
486 else
487 {
488 // Found a seqno which this signature doesn't cover. It can't
489 // cover anything else, so break here
490 break;
491 }
492 }
493 }
494
495 return !should_fill.has_value();
496 }
497 };
498
499 // Guard all access to internal state with this lock
501
502 // Track all things currently requested by external callers
503 std::map<CompoundHandle, Request> requests;
504
505 // A map containing (weak pointers to) _all_ of the stores for active
506 // requests, allowing distinct requests for the same seqnos to share the
507 // same underlying state (and benefit from faster lookup)
509
510 ExpiryDuration default_expiry_duration = std::chrono::seconds(1800);
511
512 // These two combine into an effective O(log(N)) lookup/add/remove by
513 // handle.
514 std::list<CompoundHandle> lru_requests;
515 std::map<CompoundHandle, std::list<CompoundHandle>::iterator> lru_lookup;
516
517 // To maintain the estimated size consumed by all requests. Gets updated
518 // when ledger entries are fetched, and when requests are dropped.
519 std::unordered_map<SeqNo, std::set<CompoundHandle>> store_to_requests;
520 std::unordered_map<ccf::SeqNo, size_t> raw_store_sizes{};
521
522 CacheSize soft_store_cache_limit{std::numeric_limits<size_t>::max()};
524 soft_store_cache_limit / soft_to_raw_ratio;
526
528 {
529 auto it = store_to_requests.find(seq);
530
531 if (it == store_to_requests.end())
532 {
533 store_to_requests.insert({seq, {handle}});
534 auto size = raw_store_sizes.find(seq);
535 if (size != raw_store_sizes.end())
536 {
537 estimated_store_cache_size += size->second;
538 }
539 }
540 else
541 {
542 it->second.insert(handle);
543 }
544 }
545
547 {
548 for (const auto& [seq, _] : requests.at(handle).my_stores)
549 {
550 add_request_ref(seq, handle);
551 }
552 }
553
555 {
556 auto it = store_to_requests.find(seq);
557 assert(it != store_to_requests.end());
558
559 it->second.erase(handle);
560 if (it->second.empty())
561 {
562 store_to_requests.erase(it);
563 auto size = raw_store_sizes.find(seq);
564 if (size != raw_store_sizes.end())
565 {
566 estimated_store_cache_size -= size->second;
567 raw_store_sizes.erase(size);
568 }
569 }
570 }
571
573 {
574 for (const auto& [seq, _] : requests.at(handle).my_stores)
575 {
576 remove_request_ref(seq, handle);
577 }
578 }
579
581 {
582 auto it = lru_lookup.find(handle);
583 if (it != lru_lookup.end())
584 {
585 lru_requests.erase(it->second);
586 it->second = lru_requests.insert(lru_requests.begin(), handle);
587 }
588 else
589 {
590 lru_lookup[handle] = lru_requests.insert(lru_requests.begin(), handle);
591 add_request_refs(handle);
592 }
593 }
594
595 void lru_shrink_to_fit(size_t threshold)
596 {
597 while (estimated_store_cache_size > threshold)
598 {
599 if (lru_requests.empty())
600 {
602 "LRU shrink to {} requested but cache is already empty", threshold);
603 return;
604 }
605
606 const auto handle = lru_requests.back();
608 "Cache size shrinking (reached {} / {}). Dropping {}",
610 threshold,
611 handle);
612
613 remove_request_refs(handle);
614 lru_lookup.erase(handle);
615
616 requests.erase(handle);
617 lru_requests.pop_back();
618 }
619 }
620
622 {
623 auto it = lru_lookup.find(handle);
624 if (it != lru_lookup.end())
625 {
626 remove_request_refs(handle);
627 lru_requests.erase(it->second);
628 lru_lookup.erase(it);
629 }
630 }
631
632 void update_store_raw_size(SeqNo seq, size_t new_size)
633 {
634 auto& stored_size = raw_store_sizes[seq];
635 assert(!stored_size || stored_size == new_size);
636
637 estimated_store_cache_size -= stored_size;
638 estimated_store_cache_size += new_size;
639 stored_size = new_size;
640 }
641
646
648 {
649 LOG_TRACE_FMT("fetch_entries_range({}, {})", from, to);
650
652 ::consensus::ledger_get_range,
653 to_host,
654 static_cast<::consensus::Index>(from),
655 static_cast<::consensus::Index>(to),
657 }
658
659 std::optional<ccf::SeqNo> fetch_supporting_secret_if_needed(
661 {
662 auto [earliest_ledger_secret_seqno, earliest_ledger_secret] =
665 earliest_ledger_secret != nullptr,
666 "Can't fetch without knowing earliest");
667
668 const auto too_early = seqno < earliest_ledger_secret_seqno;
669
670 auto previous_secret_stored_version =
671 earliest_ledger_secret->previous_secret_stored_version;
672 const auto is_next_secret =
673 previous_secret_stored_version.value_or(0) == seqno;
674
675 if (too_early || is_next_secret)
676 {
677 // Still need more secrets, fetch the next
678 if (!previous_secret_stored_version.has_value())
679 {
680 throw std::logic_error(fmt::format(
681 "Earliest known ledger secret at {} has no earlier secret stored "
682 "version ({})",
683 earliest_ledger_secret_seqno,
684 seqno));
685 }
686
687 const auto seqno_to_fetch = previous_secret_stored_version.value();
689 "Requesting historical entry at {} but first known ledger "
690 "secret is applicable from {}",
691 seqno,
692 earliest_ledger_secret_seqno);
693
694 auto it = all_stores.find(seqno_to_fetch);
695 auto details = it == all_stores.end() ? nullptr : it->second.lock();
696 if (details == nullptr)
697 {
698 LOG_TRACE_FMT("Requesting older secret at {} now", seqno_to_fetch);
699 details = std::make_shared<StoreDetails>();
700 all_stores.insert_or_assign(it, seqno_to_fetch, details);
701 fetch_entry_at(seqno_to_fetch);
702 }
703
704 next_secret_fetch_handle = details;
705
706 if (too_early)
707 {
708 return seqno_to_fetch;
709 }
710 }
711
712 return std::nullopt;
713 }
714
716 const StoreDetailsPtr& details,
717 const ccf::kv::StorePtr& store,
718 const ccf::crypto::Sha256Hash& entry_digest,
720 bool is_signature,
721 ccf::ClaimsDigest&& claims_digest,
722 bool has_commit_evidence)
723 {
724 // Deserialisation includes a GCM integrity check, so all entries
725 // have been verified by the time we get here.
726 details->current_stage = StoreStage::Trusted;
727 details->has_commit_evidence = has_commit_evidence;
728
729 details->entry_digest = entry_digest;
730 if (!claims_digest.empty())
731 details->claims_digest = std::move(claims_digest);
732
734 details->store == nullptr,
735 "Cache already has store for seqno {}",
736 seqno);
737 details->store = store;
738
739 details->is_signature = is_signature;
740 if (is_signature)
741 {
742 // Construct a signature receipt.
743 // We do this whether it was requested or not, because we have all
744 // the state to do so already, and it's simpler than constructing
745 // the receipt _later_ for an already-fetched signature
746 // transaction.
747 const auto sig = get_signature(details->store);
748 const auto cose_sig = get_cose_signature(details->store);
749 assert(sig.has_value());
750 details->transaction_id = {sig->view, sig->seqno};
751 details->receipt = std::make_shared<TxReceiptImpl>(
752 sig->sig, cose_sig, sig->root.h, nullptr, sig->node, sig->cert);
753 }
754
755 auto request_it = requests.begin();
756 while (request_it != requests.end())
757 {
758 auto& [handle, request] = *request_it;
759
760 // If this request was still waiting for a ledger secret, and this is
761 // that secret
762 if (
763 request.awaiting_ledger_secrets.has_value() &&
764 request.awaiting_ledger_secrets.value() == seqno)
765 {
767 "{} is a ledger secret seqno this request was waiting for", seqno);
768
769 request.awaiting_ledger_secrets =
770 fetch_supporting_secret_if_needed(request.first_requested_seqno());
771 if (!request.awaiting_ledger_secrets.has_value())
772 {
773 // Newly have all required secrets - begin fetching the actual
774 // entries. Note this is adding them to `all_stores`, from where
775 // they'll be requested on the next tick.
776 auto my_stores_it = request.my_stores.begin();
777 while (my_stores_it != request.my_stores.end())
778 {
779 auto [store_seqno, _] = *my_stores_it;
780 auto it = all_stores.find(store_seqno);
781 auto store_details =
782 it == all_stores.end() ? nullptr : it->second.lock();
783
784 if (store_details == nullptr)
785 {
786 store_details = std::make_shared<StoreDetails>();
787 all_stores.insert_or_assign(it, store_seqno, store_details);
788 }
789
790 my_stores_it->second = store_details;
791 ++my_stores_it;
792 }
793 }
794
795 // In either case, done with this request, try the next
796 ++request_it;
797 continue;
798 }
799
800 if (request.include_receipts)
801 {
802 const bool seqno_in_this_request =
803 (request.my_stores.find(seqno) != request.my_stores.end() ||
804 request.supporting_signatures.find(seqno) !=
805 request.supporting_signatures.end());
806 if (seqno_in_this_request)
807 {
808 request.populate_receipts(seqno);
809 }
810 }
811
812 ++request_it;
813 }
814 }
815
817 const ccf::kv::StorePtr& store, LedgerSecretPtr encrypting_secret)
818 {
819 // Read encrypted secrets from store
820 auto tx = store->create_read_only_tx();
821 auto encrypted_past_ledger_secret_handle =
823 ccf::Tables::ENCRYPTED_PAST_LEDGER_SECRET);
824 if (!encrypted_past_ledger_secret_handle)
825 {
826 return false;
827 }
828
829 auto encrypted_past_ledger_secret =
830 encrypted_past_ledger_secret_handle->get();
831 if (!encrypted_past_ledger_secret.has_value())
832 {
833 return false;
834 }
835
836 // Construct description and decrypted secret
837 auto previous_ledger_secret =
838 encrypted_past_ledger_secret->previous_ledger_secret;
839 if (!previous_ledger_secret.has_value())
840 {
841 // The only write to this table that should not contain a previous
842 // secret is the initial service open
844 encrypted_past_ledger_secret->next_version.has_value() &&
845 encrypted_past_ledger_secret->next_version.value() == 1,
846 "Write to ledger secrets table at {} should contain a next_version "
847 "of 1",
848 store->current_version());
849 return true;
850 }
851
852 auto recovered_ledger_secret = std::make_shared<LedgerSecret>(
854 encrypting_secret, std::move(previous_ledger_secret->encrypted_data)),
855 previous_ledger_secret->previous_secret_stored_version);
856
857 // Add recovered secret to historical secrets
858 historical_ledger_secrets->set_secret(
859 previous_ledger_secret->version, std::move(recovered_ledger_secret));
860
861 // Update earliest_secret
863 previous_ledger_secret->version < earliest_secret_.valid_from, "");
865
866 return true;
867 }
868
870 ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
871 {
872 if (end_seqno < start_seqno)
873 {
874 throw std::logic_error(fmt::format(
875 "Invalid range for historical query: end {} is before start {}",
876 end_seqno,
877 start_seqno));
878 }
879
880 SeqNoCollection c(start_seqno, end_seqno - start_seqno);
881 return c;
882 }
883
884 std::vector<StatePtr> get_states_internal(
885 const CompoundHandle& handle,
886 const SeqNoCollection& seqnos,
887 ExpiryDuration seconds_until_expiry,
888 bool include_receipts)
889 {
890 if (seqnos.empty())
891 {
892 throw std::logic_error(
893 "Invalid range for historical query: Cannot request empty range");
894 }
895
896 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
897
898 const auto ms_until_expiry =
899 std::chrono::duration_cast<std::chrono::milliseconds>(
900 seconds_until_expiry);
901
902 auto it = requests.find(handle);
903 if (it == requests.end())
904 {
905 // This is a new handle - insert a newly created Request for it
906 it = requests.emplace_hint(it, handle, Request(all_stores));
907 HISTORICAL_LOG("First time I've seen handle {}", handle);
908 }
909
910 lru_promote(handle);
911
912 Request& request = it->second;
913
915
916 // Update this Request to represent the currently requested ranges
918 "Adjusting handle {} to cover {} seqnos starting at {} "
919 "(include_receipts={})",
920 handle,
921 seqnos.size(),
922 *seqnos.begin(),
923 include_receipts);
924 auto [removed, added] = request.adjust_ranges(
925 seqnos, include_receipts, earliest_secret_.valid_from);
926
927 for (auto seq : removed)
928 {
929 remove_request_ref(seq, handle);
930 }
931 for (auto seq : added)
932 {
933 add_request_ref(seq, handle);
934 }
935
936 // If the earliest target entry cannot be deserialised with the earliest
937 // known ledger secret, record the target seqno and begin fetching the
938 // previous historical ledger secret.
941
942 // Reset the expiry timer as this has just been requested
943 request.time_to_expiry = ms_until_expiry;
944
945 std::vector<StatePtr> trusted_states;
946
947 for (auto seqno : seqnos)
948 {
949 auto target_details = request.get_store_details(seqno);
950 if (
951 target_details != nullptr &&
952 target_details->current_stage == StoreStage::Trusted &&
953 (!request.include_receipts || target_details->receipt != nullptr))
954 {
955 // Have this store, associated txid and receipt and trust it - add
956 // it to return list
957 StatePtr state = std::make_shared<State>(
958 target_details->store,
959 target_details->receipt,
960 target_details->transaction_id);
961 trusted_states.push_back(state);
962 }
963 else
964 {
965 // Still fetching this store or don't trust it yet, so range is
966 // incomplete - return empty vector
967 return {};
968 }
969 }
970
971 return trusted_states;
972 }
973
974 // Used when we received an invalid entry, to drop any requests which were
975 // asking for it
977 {
978 auto request_it = requests.begin();
979 while (request_it != requests.end())
980 {
981 if (request_it->second.get_store_details(seqno) != nullptr)
982 {
983 lru_evict(request_it->first);
984 request_it = requests.erase(request_it);
985 }
986 else
987 {
988 ++request_it;
989 }
990 }
991 }
992
993 std::vector<ccf::kv::ReadOnlyStorePtr> states_to_stores(
994 const std::vector<StatePtr>& states)
995 {
996 std::vector<ccf::kv::ReadOnlyStorePtr> stores;
997 for (size_t i = 0; i < states.size(); i++)
998 {
999 stores.push_back(states[i]->store);
1000 }
1001 return stores;
1002 }
1003
1004 public:
1006 ccf::kv::Store& store,
1007 const std::shared_ptr<ccf::LedgerSecrets>& secrets,
1008 const ringbuffer::WriterPtr& host_writer) :
1009 source_store(store),
1010 source_ledger_secrets(secrets),
1011 to_host(host_writer),
1015 {}
1016
1018 const CompoundHandle& handle,
1020 ExpiryDuration seconds_until_expiry)
1021 {
1022 auto range = get_store_range(handle, seqno, seqno, seconds_until_expiry);
1023 if (range.empty())
1024 {
1025 return nullptr;
1026 }
1027
1028 return range[0];
1029 }
1030
1036
1038 const CompoundHandle& handle,
1040 ExpiryDuration seconds_until_expiry)
1041 {
1042 auto range = get_state_range(handle, seqno, seqno, seconds_until_expiry);
1043 if (range.empty())
1044 {
1045 return nullptr;
1046 }
1047
1048 return range[0];
1049 }
1050
1055
1056 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1057 const CompoundHandle& handle,
1058 ccf::SeqNo start_seqno,
1059 ccf::SeqNo end_seqno,
1060 ExpiryDuration seconds_until_expiry)
1061 {
1063 handle,
1064 collection_from_single_range(start_seqno, end_seqno),
1065 seconds_until_expiry,
1066 false));
1067 }
1068
1069 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1070 const CompoundHandle& handle,
1071 ccf::SeqNo start_seqno,
1072 ccf::SeqNo end_seqno)
1073 {
1074 return get_store_range(
1075 handle, start_seqno, end_seqno, default_expiry_duration);
1076 }
1077
1078 std::vector<StatePtr> get_state_range(
1079 const CompoundHandle& handle,
1080 ccf::SeqNo start_seqno,
1081 ccf::SeqNo end_seqno,
1082 ExpiryDuration seconds_until_expiry)
1083 {
1084 return get_states_internal(
1085 handle,
1086 collection_from_single_range(start_seqno, end_seqno),
1087 seconds_until_expiry,
1088 true);
1089 }
1090
1091 std::vector<StatePtr> get_state_range(
1092 const CompoundHandle& handle,
1093 ccf::SeqNo start_seqno,
1094 ccf::SeqNo end_seqno)
1095 {
1096 return get_state_range(
1097 handle, start_seqno, end_seqno, default_expiry_duration);
1098 }
1099
1100 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1101 const CompoundHandle& handle,
1102 const SeqNoCollection& seqnos,
1103 ExpiryDuration seconds_until_expiry)
1104 {
1105 return states_to_stores(
1106 get_states_internal(handle, seqnos, seconds_until_expiry, false));
1107 }
1108
1109 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1110 const CompoundHandle& handle, const SeqNoCollection& seqnos)
1111 {
1112 return get_stores_for(handle, seqnos, default_expiry_duration);
1113 }
1114
1115 std::vector<StatePtr> get_states_for(
1116 const CompoundHandle& handle,
1117 const SeqNoCollection& seqnos,
1118 ExpiryDuration seconds_until_expiry)
1119 {
1120 if (seqnos.empty())
1121 {
1122 throw std::runtime_error("Cannot request empty range");
1123 }
1124 return get_states_internal(handle, seqnos, seconds_until_expiry, true);
1125 }
1126
1127 std::vector<StatePtr> get_states_for(
1128 const CompoundHandle& handle, const SeqNoCollection& seqnos)
1129 {
1130 return get_states_for(handle, seqnos, default_expiry_duration);
1131 }
1132
1134 {
1135 default_expiry_duration = duration;
1136 }
1137
1139 {
1140 soft_store_cache_limit = cache_limit;
1142 }
1143
1145 {
1147 }
1148
1150 {
1151 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1152 lru_evict(handle);
1153 const auto erased_count = requests.erase(handle);
1154 return erased_count > 0;
1155 }
1156
1157 bool handle_ledger_entry(ccf::SeqNo seqno, const std::vector<uint8_t>& data)
1158 {
1159 return handle_ledger_entry(seqno, data.data(), data.size());
1160 }
1161
1162 bool handle_ledger_entry(ccf::SeqNo seqno, const uint8_t* data, size_t size)
1163 {
1164 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1165 const auto it = all_stores.find(seqno);
1166 auto details = it == all_stores.end() ? nullptr : it->second.lock();
1167 if (details == nullptr || details->current_stage != StoreStage::Fetching)
1168 {
1169 // Unexpected entry, we already have it or weren't asking for it -
1170 // ignore this resubmission
1171 return false;
1172 }
1173
1174 ccf::kv::ApplyResult deserialise_result;
1175 ccf::ClaimsDigest claims_digest;
1176 bool has_commit_evidence;
1177 auto store = deserialise_ledger_entry(
1178 seqno,
1179 data,
1180 size,
1181 deserialise_result,
1182 claims_digest,
1183 has_commit_evidence);
1184
1185 if (deserialise_result == ccf::kv::ApplyResult::FAIL)
1186 {
1187 return false;
1188 }
1189
1190 {
1191 // Confirm this entry is from a precursor of the current state, and not
1192 // a fork
1193 const auto tx_id = store->current_txid();
1194 if (tx_id.version != seqno)
1195 {
1197 "Corrupt ledger entry received - claims to be {} but is actually "
1198 "{}.{}",
1199 seqno,
1200 tx_id.term,
1201 tx_id.version);
1202 return false;
1203 }
1204
1206 if (consensus == nullptr)
1207 {
1208 LOG_FAIL_FMT("No consensus on source store");
1209 return false;
1210 }
1211
1212 const auto actual_view = consensus->get_view(seqno);
1213 if (actual_view != tx_id.term)
1214 {
1216 "Ledger entry comes from fork - contains {}.{} but this service "
1217 "expected {}.{}",
1218 tx_id.term,
1219 tx_id.version,
1220 actual_view,
1221 seqno);
1222 return false;
1223 }
1224 }
1225
1226 const auto is_signature =
1227 deserialise_result == ccf::kv::ApplyResult::PASS_SIGNATURE;
1228
1230
1231 auto [valid_from, secret] = earliest_secret_;
1232
1233 if (
1234 secret->previous_secret_stored_version.has_value() &&
1235 secret->previous_secret_stored_version.value() == seqno)
1236 {
1238 "Handling past ledger secret. Current earliest is valid from {}, now "
1239 "processing secret stored at {}",
1240 valid_from,
1241 seqno);
1243 next_secret_fetch_handle = nullptr;
1244 }
1245
1247 "Processing historical store at {} ({})",
1248 seqno,
1249 (size_t)deserialise_result);
1250 const auto entry_digest = ccf::crypto::Sha256Hash({data, size});
1252 details,
1253 store,
1254 entry_digest,
1255 seqno,
1256 is_signature,
1257 std::move(claims_digest),
1258 has_commit_evidence);
1259
1261 return true;
1262 }
1263
1265 ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const LedgerEntry& data)
1266 {
1267 return handle_ledger_entries(
1268 from_seqno, to_seqno, data.data(), data.size());
1269 }
1270
1272 ccf::SeqNo from_seqno,
1273 ccf::SeqNo to_seqno,
1274 const uint8_t* data,
1275 size_t size)
1276 {
1277 LOG_TRACE_FMT("handle_ledger_entries({}, {})", from_seqno, to_seqno);
1278
1279 auto seqno = from_seqno;
1280 bool all_accepted = true;
1281 while (size > 0)
1282 {
1283 const auto header =
1284 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1285 const auto whole_size =
1286 header.size + ccf::kv::serialised_entry_header_size;
1287 all_accepted &= handle_ledger_entry(seqno, data, whole_size);
1288 data += whole_size;
1289 size -= whole_size;
1290 ++seqno;
1291 }
1292
1293 if (seqno != to_seqno + 1)
1294 {
1296 "Claimed ledger entries: [{}, {}), actual [{}, {}]",
1297 from_seqno,
1298 to_seqno,
1299 from_seqno,
1300 seqno);
1301 }
1302
1303 return all_accepted;
1304 }
1305
1310
1312 {
1313 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1314
1315 LOG_TRACE_FMT("handle_no_entry_range({}, {})", from_seqno, to_seqno);
1316
1317 for (auto seqno = from_seqno; seqno <= to_seqno; ++seqno)
1318 {
1319 // The host failed or refused to give this entry. Currently just
1320 // forget about it and drop any requests which were looking for it -
1321 // don't have a mechanism for remembering this failure and reporting it
1322 // to users.
1323 const auto fetches_it = all_stores.find(seqno);
1324 if (fetches_it != all_stores.end())
1325 {
1327
1328 all_stores.erase(fetches_it);
1329 }
1330 }
1331 }
1332
1335 const uint8_t* data,
1336 size_t size,
1337 ccf::kv::ApplyResult& result,
1338 ccf::ClaimsDigest& claims_digest,
1339 bool& has_commit_evidence)
1340 {
1341 // Create a new store and try to deserialise this entry into it
1342 ccf::kv::StorePtr store = std::make_shared<ccf::kv::Store>(
1343 false /* Do not start from very first seqno */,
1344 true /* Make use of historical secrets */);
1345
1346 // If this is older than the node's currently known ledger secrets, use
1347 // the historical encryptor (which should have older secrets)
1348 if (seqno < source_ledger_secrets->get_first().first)
1349 {
1350 store->set_encryptor(historical_encryptor);
1351 }
1352 else
1353 {
1354 store->set_encryptor(source_store.get_encryptor());
1355 }
1356
1357 try
1358 {
1359 // Encrypted ledger secrets are deserialised in public-only mode. Their
1360 // Merkle tree integrity is not verified: even if the recovered ledger
1361 // secret was bogus, the deserialisation of subsequent ledger entries
1362 // would fail.
1363 bool public_only = false;
1364 for (const auto& [_, request] : requests)
1365 {
1366 const auto& als = request.awaiting_ledger_secrets;
1367 if (als.has_value() && als.value() == seqno)
1368 {
1369 public_only = true;
1370 break;
1371 }
1372 }
1373
1374 auto exec = store->deserialize({data, data + size}, public_only);
1375 if (exec == nullptr)
1376 {
1378 return nullptr;
1379 }
1380
1381 result = exec->apply(track_deletes_on_missing_keys_v);
1382 claims_digest = std::move(exec->consume_claims_digest());
1383
1384 auto commit_evidence_digest =
1385 std::move(exec->consume_commit_evidence_digest());
1386 has_commit_evidence = commit_evidence_digest.has_value();
1387 }
1388 catch (const std::exception& e)
1389 {
1391 "Exception while attempting to deserialise entry {}: {}",
1392 seqno,
1393 e.what());
1395 }
1396
1397 return store;
1398 }
1399
1400 void tick(const std::chrono::milliseconds& elapsed_ms)
1401 {
1402 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1403
1404 {
1405 auto it = requests.begin();
1406 while (it != requests.end())
1407 {
1408 auto& request = it->second;
1409 if (elapsed_ms >= request.time_to_expiry)
1410 {
1412 "Dropping expired historical query with handle {}", it->first);
1413 lru_evict(it->first);
1414 it = requests.erase(it);
1415 }
1416 else
1417 {
1418 request.time_to_expiry -= elapsed_ms;
1419 ++it;
1420 }
1421 }
1422 }
1423
1425
1426 {
1427 auto it = all_stores.begin();
1428 std::optional<std::pair<ccf::SeqNo, ccf::SeqNo>> range_to_request =
1429 std::nullopt;
1430 while (it != all_stores.end())
1431 {
1432 auto details = it->second.lock();
1433 if (details == nullptr)
1434 {
1435 it = all_stores.erase(it);
1436 }
1437 else
1438 {
1439 if (details->current_stage == StoreStage::Fetching)
1440 {
1441 details->time_until_fetch -= elapsed_ms;
1442 if (details->time_until_fetch.count() <= 0)
1443 {
1444 details->time_until_fetch = slow_fetch_threshold;
1445
1446 const auto seqno = it->first;
1447 if (
1448 range_to_request.has_value() &&
1449 range_to_request->second + 1 == seqno)
1450 {
1451 range_to_request->second = seqno;
1452 }
1453 else
1454 {
1455 if (range_to_request.has_value())
1456 {
1457 // Submit fetch for previously tracked range
1459 range_to_request->first, range_to_request->second);
1460 }
1461
1462 // Track new range
1463 range_to_request = std::make_pair(seqno, seqno);
1464 }
1465 }
1466 }
1467
1468 ++it;
1469 }
1470 }
1471
1472 if (range_to_request.has_value())
1473 {
1474 // Submit fetch for final tracked range
1476 range_to_request->first, range_to_request->second);
1477 }
1478 }
1479 }
1480 };
1481
1483 {
1484 protected:
1489
1490 public:
1491 template <typename... Ts>
1492 StateCache(Ts&&... ts) : StateCacheImpl(std::forward<Ts>(ts)...)
1493 {}
1494
1496 RequestHandle handle,
1498 ExpiryDuration seconds_until_expiry) override
1499 {
1501 make_compound_handle(handle), seqno, seconds_until_expiry);
1502 }
1503
1509
1511 RequestHandle handle,
1513 ExpiryDuration seconds_until_expiry) override
1514 {
1516 make_compound_handle(handle), seqno, seconds_until_expiry);
1517 }
1518
1523
1524 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1525 RequestHandle handle,
1526 ccf::SeqNo start_seqno,
1527 ccf::SeqNo end_seqno,
1528 ExpiryDuration seconds_until_expiry) override
1529 {
1531 make_compound_handle(handle),
1532 start_seqno,
1533 end_seqno,
1534 seconds_until_expiry);
1535 }
1536
1537 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1538 RequestHandle handle,
1539 ccf::SeqNo start_seqno,
1540 ccf::SeqNo end_seqno) override
1541 {
1543 make_compound_handle(handle), start_seqno, end_seqno);
1544 }
1545
1546 std::vector<StatePtr> get_state_range(
1547 RequestHandle handle,
1548 ccf::SeqNo start_seqno,
1549 ccf::SeqNo end_seqno,
1550 ExpiryDuration seconds_until_expiry) override
1551 {
1553 make_compound_handle(handle),
1554 start_seqno,
1555 end_seqno,
1556 seconds_until_expiry);
1557 }
1558
1559 std::vector<StatePtr> get_state_range(
1560 RequestHandle handle,
1561 ccf::SeqNo start_seqno,
1562 ccf::SeqNo end_seqno) override
1563 {
1565 make_compound_handle(handle), start_seqno, end_seqno);
1566 }
1567
1568 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1569 RequestHandle handle,
1570 const SeqNoCollection& seqnos,
1571 ExpiryDuration seconds_until_expiry) override
1572 {
1574 make_compound_handle(handle), seqnos, seconds_until_expiry);
1575 }
1576
1577 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1578 RequestHandle handle, const SeqNoCollection& seqnos) override
1579 {
1581 make_compound_handle(handle), seqnos);
1582 }
1583
1584 std::vector<StatePtr> get_states_for(
1585 RequestHandle handle,
1586 const SeqNoCollection& seqnos,
1587 ExpiryDuration seconds_until_expiry) override
1588 {
1590 make_compound_handle(handle), seqnos, seconds_until_expiry);
1591 }
1592
1593 std::vector<StatePtr> get_states_for(
1594 RequestHandle handle, const SeqNoCollection& seqnos) override
1595 {
1597 make_compound_handle(handle), seqnos);
1598 }
1599
1604
1605 void set_soft_cache_limit(CacheSize cache_limit) override
1606 {
1608 }
1609
1610 void track_deletes_on_missing_keys(bool track) override
1611 {
1613 }
1614
1615 bool drop_cached_states(RequestHandle handle) override
1616 {
1618 }
1619 };
1620}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition claims_digest.h:10
Definition ledger_secrets.h:24
Definition history.h:436
Definition sha256_hash.h:16
Definition contiguous_set.h:18
ConstIterator end() const
Definition contiguous_set.h:495
bool empty() const
Definition contiguous_set.h:320
ConstIterator begin() const
Definition contiguous_set.h:490
size_t size() const
Definition contiguous_set.h:312
Definition historical_queries_interface.h:67
Definition historical_queries.h:95
std::map< CompoundHandle, std::list< CompoundHandle >::iterator > lru_lookup
Definition historical_queries.h:515
void update_store_raw_size(SeqNo seq, size_t new_size)
Definition historical_queries.h:632
void process_deserialised_store(const StoreDetailsPtr &details, const ccf::kv::StorePtr &store, const ccf::crypto::Sha256Hash &entry_digest, ccf::SeqNo seqno, bool is_signature, ccf::ClaimsDigest &&claims_digest, bool has_commit_evidence)
Definition historical_queries.h:715
std::vector< StatePtr > get_states_for(const CompoundHandle &handle, const SeqNoCollection &seqnos)
Definition historical_queries.h:1127
std::map< CompoundHandle, Request > requests
Definition historical_queries.h:503
std::list< CompoundHandle > lru_requests
Definition historical_queries.h:514
std::vector< ccf::kv::ReadOnlyStorePtr > states_to_stores(const std::vector< StatePtr > &states)
Definition historical_queries.h:993
ExpiryDuration default_expiry_duration
Definition historical_queries.h:510
bool track_deletes_on_missing_keys_v
Definition historical_queries.h:105
ccf::kv::Store & source_store
Definition historical_queries.h:97
std::map< ccf::SeqNo, WeakStoreDetailsPtr > AllRequestedStores
Definition historical_queries.h:186
ringbuffer::WriterPtr to_host
Definition historical_queries.h:99
void add_request_refs(CompoundHandle handle)
Definition historical_queries.h:546
std::vector< StatePtr > get_state_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1078
std::unordered_map< SeqNo, std::set< CompoundHandle > > store_to_requests
Definition historical_queries.h:519
std::vector< StatePtr > get_state_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:1091
AllRequestedStores all_stores
Definition historical_queries.h:508
void set_soft_cache_limit(CacheSize cache_limit)
Definition historical_queries.h:1138
void remove_request_ref(SeqNo seq, CompoundHandle handle)
Definition historical_queries.h:554
CacheSize estimated_store_cache_size
Definition historical_queries.h:525
StatePtr get_state_at(const CompoundHandle &handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1037
bool drop_cached_states(const CompoundHandle &handle)
Definition historical_queries.h:1149
void handle_no_entry(ccf::SeqNo seqno)
Definition historical_queries.h:1306
void remove_request_refs(CompoundHandle handle)
Definition historical_queries.h:572
void track_deletes_on_missing_keys(bool track)
Definition historical_queries.h:1144
std::vector< StatePtr > get_states_for(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1115
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1100
std::map< ccf::SeqNo, StoreDetailsPtr > RequestedStores
Definition historical_queries.h:183
void handle_no_entry_range(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno)
Definition historical_queries.h:1311
bool handle_encrypted_past_ledger_secret(const ccf::kv::StorePtr &store, LedgerSecretPtr encrypting_secret)
Definition historical_queries.h:816
ccf::kv::ReadOnlyStorePtr get_store_at(const CompoundHandle &handle, ccf::SeqNo seqno)
Definition historical_queries.h:1031
StatePtr get_state_at(const CompoundHandle &handle, ccf::SeqNo seqno)
Definition historical_queries.h:1051
std::shared_ptr< ccf::NodeEncryptor > historical_encryptor
Definition historical_queries.h:102
CacheSize soft_store_cache_limit
Definition historical_queries.h:522
void add_request_ref(SeqNo seq, CompoundHandle handle)
Definition historical_queries.h:527
void fetch_entries_range(ccf::SeqNo from, ccf::SeqNo to)
Definition historical_queries.h:647
std::vector< uint8_t > LedgerEntry
Definition historical_queries.h:113
bool handle_ledger_entry(ccf::SeqNo seqno, const uint8_t *data, size_t size)
Definition historical_queries.h:1162
void fetch_entry_at(ccf::SeqNo seqno)
Definition historical_queries.h:642
ccf::kv::ReadOnlyStorePtr get_store_at(const CompoundHandle &handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1017
StoreDetailsPtr next_secret_fetch_handle
Definition historical_queries.h:203
void update_earliest_known_ledger_secret()
Definition historical_queries.h:115
ccf::pal::Mutex requests_lock
Definition historical_queries.h:500
std::shared_ptr< StoreDetails > StoreDetailsPtr
Definition historical_queries.h:182
bool handle_ledger_entry(ccf::SeqNo seqno, const std::vector< uint8_t > &data)
Definition historical_queries.h:1157
void tick(const std::chrono::milliseconds &elapsed_ms)
Definition historical_queries.h:1400
VersionedSecret earliest_secret_
Definition historical_queries.h:202
std::optional< ccf::SeqNo > fetch_supporting_secret_if_needed(ccf::SeqNo seqno)
Definition historical_queries.h:659
std::unordered_map< ccf::SeqNo, size_t > raw_store_sizes
Definition historical_queries.h:520
std::shared_ptr< ccf::LedgerSecrets > source_ledger_secrets
Definition historical_queries.h:98
ccf::kv::StorePtr deserialise_ledger_entry(ccf::SeqNo seqno, const uint8_t *data, size_t size, ccf::kv::ApplyResult &result, ccf::ClaimsDigest &claims_digest, bool &has_commit_evidence)
Definition historical_queries.h:1333
std::shared_ptr< ccf::LedgerSecrets > historical_ledger_secrets
Definition historical_queries.h:101
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(const CompoundHandle &handle, const SeqNoCollection &seqnos)
Definition historical_queries.h:1109
StateCacheImpl(ccf::kv::Store &store, const std::shared_ptr< ccf::LedgerSecrets > &secrets, const ringbuffer::WriterPtr &host_writer)
Definition historical_queries.h:1005
std::vector< StatePtr > get_states_internal(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry, bool include_receipts)
Definition historical_queries.h:884
void delete_all_interested_requests(ccf::SeqNo seqno)
Definition historical_queries.h:976
void lru_shrink_to_fit(size_t threshold)
Definition historical_queries.h:595
bool handle_ledger_entries(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const uint8_t *data, size_t size)
Definition historical_queries.h:1271
CacheSize soft_store_cache_limit_raw
Definition historical_queries.h:523
void set_default_expiry_duration(ExpiryDuration duration)
Definition historical_queries.h:1133
bool handle_ledger_entries(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const LedgerEntry &data)
Definition historical_queries.h:1264
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:1069
std::weak_ptr< StoreDetails > WeakStoreDetailsPtr
Definition historical_queries.h:185
SeqNoCollection collection_from_single_range(ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:869
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1056
void lru_promote(CompoundHandle handle)
Definition historical_queries.h:580
StoreStage
Definition historical_queries.h:108
void lru_evict(CompoundHandle handle)
Definition historical_queries.h:621
Definition historical_queries.h:1483
StatePtr get_state_at(RequestHandle handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1510
std::vector< StatePtr > get_state_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1546
std::vector< StatePtr > get_states_for(RequestHandle handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1584
CompoundHandle make_compound_handle(RequestHandle rh)
Definition historical_queries.h:1485
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) override
Definition historical_queries.h:1537
ccf::kv::ReadOnlyStorePtr get_store_at(RequestHandle handle, ccf::SeqNo seqno) override
Definition historical_queries.h:1504
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(RequestHandle handle, const SeqNoCollection &seqnos) override
Definition historical_queries.h:1577
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(RequestHandle handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1568
StatePtr get_state_at(RequestHandle handle, ccf::SeqNo seqno) override
Definition historical_queries.h:1519
void set_soft_cache_limit(CacheSize cache_limit) override
Definition historical_queries.h:1605
std::vector< StatePtr > get_state_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) override
Definition historical_queries.h:1559
ccf::kv::ReadOnlyStorePtr get_store_at(RequestHandle handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1495
void set_default_expiry_duration(ExpiryDuration duration) override
Definition historical_queries.h:1600
bool drop_cached_states(RequestHandle handle) override
Definition historical_queries.h:1615
std::vector< StatePtr > get_states_for(RequestHandle handle, const SeqNoCollection &seqnos) override
Definition historical_queries.h:1593
void track_deletes_on_missing_keys(bool track) override
Definition historical_queries.h:1610
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1524
StateCache(Ts &&... ts)
Definition historical_queries.h:1492
Definition store.h:88
ReadOnlyTx create_read_only_tx() override
Definition store.h:1275
EncryptorPtr get_encryptor() override
Definition store.h:226
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:188
Definition encryptor.h:15
Definition value.h:32
#define HISTORICAL_LOG(...)
Definition historical_queries.h:25
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
std::vector< uint8_t > HashBytes
Definition hash_bytes.h:10
Definition historical_queries_adapter.h:18
std::chrono::seconds ExpiryDuration
Definition historical_queries_interface.h:50
std::shared_ptr< State > StatePtr
Definition historical_queries_interface.h:41
RequestNamespace
Definition historical_queries.h:31
std::pair< RequestNamespace, RequestHandle > CompoundHandle
Definition historical_queries.h:36
size_t RequestHandle
Definition historical_queries_interface.h:48
size_t CacheSize
Definition historical_queries_interface.h:52
ApplyResult
Definition kv_types.h:341
@ PASS_SIGNATURE
Definition kv_types.h:343
@ FAIL
Definition kv_types.h:350
std::shared_ptr< ReadOnlyStore > ReadOnlyStorePtr
Definition read_only_store.h:23
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1340
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
LedgerSecretsMap::value_type VersionedLedgerSecret
Definition ledger_secrets.h:21
std::shared_ptr< TxReceiptImpl > TxReceiptImplPtr
Definition receipt.h:136
std::shared_ptr< LedgerSecret > LedgerSecretPtr
Definition ledger_secret.h:75
seqno
Definition signatures.h:54
uint64_t SeqNo
Definition tx_id.h:36
std::vector< uint8_t > decrypt_previous_ledger_secret_raw(const LedgerSecretPtr &ledger_secret, const std::vector< uint8_t > &encrypted_previous_secret_raw)
Definition ledger_secret.h:83
Definition consensus_types.h:23
uint64_t Index
Definition ledger_enclave_types.h:11
@ HistoricalQuery
Definition ledger_enclave_types.h:16
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition historical_queries.h:206
bool include_receipts
Definition historical_queries.h:212
StoreDetailsPtr get_store_details(ccf::SeqNo seqno) const
Definition historical_queries.h:224
Request(AllRequestedStores &all_stores_)
Definition historical_queries.h:222
void populate_receipts(ccf::SeqNo new_seqno)
Definition historical_queries.h:350
std::pair< std::vector< SeqNo >, std::vector< SeqNo > > adjust_ranges(const SeqNoCollection &new_seqnos, bool should_include_receipts, SeqNo earliest_ledger_secret_seqno)
Definition historical_queries.h:245
std::optional< ccf::SeqNo > awaiting_ledger_secrets
Definition historical_queries.h:220
AllRequestedStores & all_stores
Definition historical_queries.h:207
std::chrono::milliseconds time_to_expiry
Definition historical_queries.h:210
RequestedStores supporting_signatures
Definition historical_queries.h:217
ccf::SeqNo first_requested_seqno() const
Definition historical_queries.h:235
RequestedStores my_stores
Definition historical_queries.h:209
Definition historical_queries.h:141
std::chrono::milliseconds time_until_fetch
Definition historical_queries.h:142
std::optional< std::string > get_commit_evidence()
Definition historical_queries.h:166
bool has_commit_evidence
Definition historical_queries.h:150
ccf::kv::StorePtr store
Definition historical_queries.h:146
ccf::ClaimsDigest claims_digest
Definition historical_queries.h:145
ccf::crypto::Sha256Hash entry_digest
Definition historical_queries.h:144
ccf::TxID transaction_id
Definition historical_queries.h:149
ccf::crypto::HashBytes get_commit_nonce()
Definition historical_queries.h:152
TxReceiptImplPtr receipt
Definition historical_queries.h:148
StoreStage current_stage
Definition historical_queries.h:143
bool is_signature
Definition historical_queries.h:147
Definition historical_queries.h:189
ccf::LedgerSecretPtr secret
Definition historical_queries.h:191
ccf::SeqNo valid_from
Definition historical_queries.h:190
VersionedSecret(const ccf::VersionedLedgerSecret &vls)
Definition historical_queries.h:196
constexpr auto parse(ParseContext &ctx)
Definition historical_queries.h:45
auto format(const ccf::historical::CompoundHandle &p, FormatContext &ctx) const
Definition historical_queries.h:51