CCF
Loading...
Searching...
No Matches
indexer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
8#include "kv/kv_types.h"
9#include "kv/store.h"
10
11#include <memory>
12#include <string>
13
14namespace ccf::indexing
15{
16 // This is responsible for managing a collection of strategies, and ensuring
17 // each has been given every transaction up to the commit point, in-order.
19 {
20 public:
21 static constexpr size_t MAX_REQUESTABLE = 500;
22
23 protected:
24 std::shared_ptr<TransactionFetcher> transaction_fetcher;
25
26 using PendingTx = std::pair<ccf::TxID, std::vector<uint8_t>>;
27 std::vector<PendingTx> uncommitted_entries;
28
30
31 static bool tx_id_less(const ccf::TxID& a, const ccf::TxID& b)
32 {
33 // NB: This will return true for 2.10 < 4.5, which isn't necessarily
34 // what's wanted in all comparisons (that's why this isn't implemented for
35 // ccf::TxID directly). But it's fine for everywhere we use it here, since
36 // we assume we only see a node's transitions in-order.
37 return a.view < b.view || a.seqno < b.seqno;
38 }
39
40 static bool uncommitted_entries_cmp(const ccf::TxID& a, const PendingTx& b)
41 {
42 return tx_id_less(a, b.first);
43 }
44
45 void update_commit(const ccf::TxID& tx_id)
46 {
47 if (tx_id_less(tx_id, committed))
48 {
49 throw std::logic_error(fmt::format(
50 "Committing out-of-order. Committed to {}, trying to commit {}",
52 tx_id.to_str()));
53 }
54
55 committed = tx_id;
56 }
57
58 public:
59 Indexer(const std::shared_ptr<TransactionFetcher>& tf) :
61 {}
62
63 // Returns true if it looks like there's still a gap to fill. Useful for
64 // testing
66 std::chrono::milliseconds elapsed, const ccf::TxID& newly_committed)
67 {
68 update_commit(newly_committed);
69
70 std::optional<ccf::SeqNo> min_requested = std::nullopt;
71
72 std::lock_guard<ccf::pal::Mutex> guard(lock);
73
74 for (auto& strategy : strategies)
75 {
76 strategy->tick();
77
78 const auto next_requested = strategy->next_requested();
79 if (!next_requested.has_value())
80 {
81 // If this strategy has an upper-bound on Txs it cares about, and
82 // we've already provided that, don't consider advancing it any
83 // further
84 continue;
85 }
86
87 if (!min_requested.has_value() || *next_requested < *min_requested)
88 {
89 min_requested = next_requested;
90 }
91 }
92
93 if (min_requested.has_value())
94 {
95 if (*min_requested <= committed.seqno)
96 {
97 // Request a prefix of the missing entries. Cap the requested range,
98 // so we don't overload the node with a huge historical request
99 const auto first_requested = *min_requested;
100 auto additional = std::min(
102 committed.seqno - first_requested);
103
104 SeqNoCollection seqnos;
105 for (auto i = first_requested; i <= first_requested + additional; ++i)
106 {
107 seqnos.insert(i);
108 }
109
110 auto stores = transaction_fetcher->fetch_transactions(seqnos);
111 for (auto& store : stores)
112 {
113 const ccf::TxID tx_id = store->get_txid();
114
115 for (auto& strategy : strategies)
116 {
117 const auto next_requested = strategy->next_requested();
118 if (
119 next_requested.has_value() && (tx_id.seqno == *next_requested))
120 {
121 strategy->handle_committed_transaction(tx_id, store);
122 }
123 }
124 }
125
126 return true;
127 }
128 }
129
130 return false;
131 }
132 };
133}
Definition contiguous_set.h:18
bool insert(const T &t)
Definition contiguous_set.h:325
Definition indexer.h:19
void update_commit(const ccf::TxID &tx_id)
Definition indexer.h:45
Indexer(const std::shared_ptr< TransactionFetcher > &tf)
Definition indexer.h:59
std::vector< PendingTx > uncommitted_entries
Definition indexer.h:27
bool update_strategies(std::chrono::milliseconds elapsed, const ccf::TxID &newly_committed)
Definition indexer.h:65
ccf::TxID committed
Definition indexer.h:29
static bool tx_id_less(const ccf::TxID &a, const ccf::TxID &b)
Definition indexer.h:31
static constexpr size_t MAX_REQUESTABLE
Definition indexer.h:21
std::shared_ptr< TransactionFetcher > transaction_fetcher
Definition indexer.h:24
std::pair< ccf::TxID, std::vector< uint8_t > > PendingTx
Definition indexer.h:26
static bool uncommitted_entries_cmp(const ccf::TxID &a, const PendingTx &b)
Definition indexer.h:40
Definition indexer_interface.h:21
std::set< StrategyPtr > strategies
Definition indexer_interface.h:24
ccf::pal::Mutex lock
Definition indexer_interface.h:23
Definition indexer_interface.h:14
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
std::string to_str() const
Definition tx_id.h:48