109 std::chrono::milliseconds last_ack_timeout;
111 NodeState() =
default;
116 Index match_idx_ = 0) :
117 node_info(node_info_),
119 match_idx(match_idx_),
125 std::unique_ptr<Store> store;
128 std::optional<ccf::NodeId> voted_for = std::nullopt;
129 std::optional<ccf::NodeId> leader_id = std::nullopt;
134 std::unordered_set<ccf::NodeId> votes;
137 std::map<Index, Votes> votes_for_me;
139 std::chrono::milliseconds timeout_elapsed;
150 bool is_new_follower =
false;
157 bool should_sign =
false;
159 std::shared_ptr<aft::State> state;
162 std::chrono::milliseconds request_timeout;
163 std::chrono::milliseconds election_timeout;
164 size_t max_uncommitted_tx_count;
165 bool ticking =
false;
168 std::list<Configuration> configurations;
176 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
177 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
181 std::shared_ptr<ccf::NodeClient> node_client;
184 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
186 size_t entry_size_not_limited = 0;
187 size_t entry_count = 0;
188 Index entries_batch_size = 20;
189 static constexpr int batch_window_size = 100;
190 int batch_window_sum = 0;
194 bool public_only =
false;
197 std::uniform_int_distribution<int> distrib;
198 std::default_random_engine rand;
203 static constexpr size_t max_terms_per_append_entries = 1;
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ =
false,
223 store(
std::move(store_)),
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
233 reconfiguration_type(reconfiguration_type_),
234 node_client(rpc_request_context_),
235 retired_node_cleanup(
236 std::make_unique<
ccf::RetiredNodeCleanup>(node_client)),
238 public_only(public_only_),
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
256 return state->node_id;
271 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
272 return can_replicate_unsafe();
282 if (max_uncommitted_tx_count == 0)
286 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
288 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
293 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
294 if (can_sign_unsafe())
298 return Consensus::SignatureDisposition::SHOULD_SIGN;
302 return Consensus::SignatureDisposition::CAN_SIGN;
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
339 ccf::SeqNo seqno,
const std::vector<ccf::kv::NodeId>& node_ids)
override
341 for (
auto& node_id : node_ids)
347 "Node is not retired, cannot become retired committed");
350 "Node is not retired, cannot become retired committed");
351 state->retired_committed_idx = seqno;
360 all_other_nodes.erase(node_id);
361 RAFT_INFO_FMT(
"Removed {} from nodes known to consensus", node_id);
368 return state->committable_indices.empty() ?
370 state->committable_indices.back();
378 const auto it = std::upper_bound(
379 state->committable_indices.rbegin(),
380 state->committable_indices.rend(),
382 [](
const auto& l,
const auto& r) { return l >= r; });
383 if (it == state->committable_indices.rend())
393 while (!state->committable_indices.empty() &&
394 (state->committable_indices.front() <= idx))
396 state->committable_indices.pop_front();
404 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
412 if (leader_id.has_value())
414 throw std::logic_error(
415 "Can't force leadership if there is already a leader");
418 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
419 state->current_view += starting_view_change;
426 const std::vector<Index>& terms,
427 Index commit_idx_)
override
431 if (leader_id.has_value())
433 throw std::logic_error(
434 "Can't force leadership if there is already a leader");
437 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
438 state->current_view = term;
439 state->last_idx = index;
440 state->commit_idx = commit_idx_;
441 state->view_history.initialise(terms);
442 state->view_history.update(index, term);
443 state->current_view += starting_view_change;
450 const std::vector<Index>& term_history,
451 Index recovery_start_index = 0)
override
455 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
457 state->last_idx = index;
458 state->commit_idx = index;
460 state->view_history.initialise(term_history);
462 ledger->init(index, recovery_start_index);
469 return state->last_idx;
474 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
475 return get_commit_idx_unsafe();
480 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
481 return state->current_view;
486 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
487 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
488 return {get_term_internal(commit_idx), commit_idx};
493 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
494 return get_term_internal(idx);
500 return state->view_history.get_history_until(idx);
506 return state->view_history.get_history_since(idx);
515 std::set<ccf::NodeId> nodes;
517 for (
auto const& conf : configurations)
519 for (
auto const& [node_id, _] : conf.nodes)
521 if (node_id != state->node_id)
523 nodes.insert(node_id);
535 const std::unordered_set<ccf::NodeId>& new_learner_nodes = {},
536 const std::unordered_set<ccf::NodeId>& new_retired_nodes = {})
override
539 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
541 assert(new_learner_nodes.empty());
543#ifdef CCF_RAFT_TRACING
544 nlohmann::json j = {};
545 j[
"function"] =
"add_configuration";
547 COMMITTABLE_INDICES(j[
"state"], state);
548 j[
"configurations"] = configurations;
549 j[
"args"] = nlohmann::json::object();
559 !configurations.empty() &&
560 configurations.back().nodes.find(state->node_id) !=
561 configurations.back().nodes.end() &&
562 conf.find(state->node_id) == conf.end())
567 if (configurations.empty() || conf != configurations.back().nodes)
570 configurations.push_back(new_config);
572 create_and_remove_node_state();
579 using namespace std::chrono_literals;
580 timeout_elapsed = 0ms;
586 for (
auto& node : all_other_nodes)
588 using namespace std::chrono_literals;
589 node.second.last_ack_timeout = 0ms;
595 if (configurations.empty())
600 return configurations.back().nodes;
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
612 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
622 for (
auto const& conf : configurations)
624 details.
configs.push_back(conf);
626 for (
auto& [k, v] : all_other_nodes)
629 v.match_idx,
static_cast<size_t>(v.last_ack_timeout.count())};
637 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
642 "Failed to replicate {} items: not leader", entries.size());
647 if (term != state->current_view)
650 "Failed to replicate {} items at term {}, current term is {}",
653 state->current_view);
660 "Failed to replicate {} items: node retirement is complete",
668 for (
auto& [index, data, is_globally_committable, hooks] : entries)
670 bool globally_committable = is_globally_committable;
672 if (index != state->last_idx + 1)
676 "Replicated on leader {}: {}{} ({} hooks)",
679 (globally_committable ?
" committable" :
""),
682#ifdef CCF_RAFT_TRACING
683 nlohmann::json j = {};
684 j[
"function"] =
"replicate";
686 COMMITTABLE_INDICES(j[
"state"], state);
689 j[
"globally_committable"] = globally_committable;
693 for (
auto& hook : *hooks)
698 if (globally_committable)
701 "membership: {} leadership: {}",
702 state->membership_state,
703 state->leadership_state);
710 state->committable_indices.push_back(index);
711 start_ticking_if_necessary();
718 state->last_idx = index;
720 *data, globally_committable, state->current_view, index);
721 entry_size_not_limited += data->size();
724 state->view_history.update(index, state->current_view);
729 entry_size_not_limited = 0;
730 for (
const auto& it : all_other_nodes)
733 send_append_entries(it.first, it.second.sent_idx + 1);
748 const ccf::NodeId& from,
const uint8_t* data,
size_t size)
override
750 RaftMsgType type = serialized::peek<RaftMsgType>(data, size);
759 channels->template recv_authenticated<AppendEntries>(
761 recv_append_entries(from, r, data, size);
768 channels->template recv_authenticated<AppendEntriesResponse>(
770 recv_append_entries_response(from, r);
778 recv_request_vote(from, r);
785 channels->template recv_authenticated<RequestVoteResponse>(
787 recv_request_vote_response(from, r);
794 channels->template recv_authenticated<ProposeRequestVote>(
796 recv_propose_request_vote(from, r);
816 catch (
const std::exception& e)
824 void periodic(std::chrono::milliseconds elapsed)
override
826 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
827 timeout_elapsed += elapsed;
831 if (timeout_elapsed >= request_timeout)
833 using namespace std::chrono_literals;
834 timeout_elapsed = 0ms;
838 for (
const auto& node : all_other_nodes)
840 send_append_entries(node.first, node.second.sent_idx + 1);
844 for (
auto& node : all_other_nodes)
846 node.second.last_ack_timeout += elapsed;
849 bool has_quorum_of_backups =
false;
850 for (
auto const& conf : configurations)
852 size_t backup_ack_timeout_count = 0;
853 for (
auto const& node : conf.nodes)
855 auto search = all_other_nodes.find(node.first);
856 if (search == all_other_nodes.end())
861 if (search->second.last_ack_timeout >= election_timeout)
864 "No ack received from {} in last {}",
867 backup_ack_timeout_count++;
871 if (backup_ack_timeout_count < get_quorum(conf.nodes.size() - 1))
875 has_quorum_of_backups =
true;
880 if (!has_quorum_of_backups)
887 "Stepping down as leader {}: No ack received from a majority of "
888 "backups in last {}",
898 timeout_elapsed >= election_timeout)
915 Index probe_index = std::min(tx_id.
seqno, state->last_idx);
916 Term term_of_probe = state->view_history.view_at(probe_index);
917 while (term_of_probe > tx_id.
view)
922 probe_index = state->view_history.start_of_view(term_of_probe);
927 term_of_probe = state->view_history.view_at(probe_index);
931 "Looking for match with {}.{}, from {}.{}, best answer is {}",
934 state->view_history.view_at(state->last_idx),
940 inline void update_batch_size()
942 auto avg_entry_size = (entry_count == 0) ?
944 entry_size_not_limited / entry_count;
946 auto batch_size = (avg_entry_size == 0) ?
950 auto batch_avg = batch_window_sum / batch_window_size;
952 batch_window_sum += (batch_size - batch_avg);
953 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
958 if (idx > state->last_idx)
961 return state->view_history.view_at(idx);
964 bool can_replicate_unsafe()
970 bool can_sign_unsafe()
976 Index get_commit_idx_unsafe()
978 return state->commit_idx;
984 "Sending append entries to node {} in batches of {}, covering the "
991 auto calculate_end_index = [
this](
Index start) {
996 max_terms_per_append_entries == 1,
997 "AppendEntries construction logic enforces single term");
998 auto max_idx = state->last_idx;
999 const auto term_of_ae = state->view_history.view_at(start);
1000 const auto index_at_end_of_term =
1001 state->view_history.end_of_view(term_of_ae);
1002 if (index_at_end_of_term != ccf::kv::NoVersion)
1004 max_idx = index_at_end_of_term;
1006 return std::min(start + entries_batch_size - 1, max_idx);
1015 end_idx = calculate_end_index(start_idx);
1016 RAFT_TRACE_FMT(
"Sending sub range {} -> {}", start_idx, end_idx);
1017 send_append_entries_range(to, start_idx, end_idx);
1018 start_idx = std::min(end_idx + 1, state->last_idx);
1019 }
while (end_idx != state->last_idx);
1022 void send_append_entries_range(
1025 const auto prev_idx = start_idx - 1;
1034 const auto prev_term = get_term_internal(prev_idx);
1035 const auto term_of_idx = get_term_internal(end_idx);
1038 "Send append entries from {} to {}: ({}.{}, {}.{}] ({})",
1047#pragma clang diagnostic push
1048#pragma clang diagnostic ignored "-Wc99-designator"
1051 {.idx = end_idx, .prev_idx = prev_idx},
1052 .term = state->current_view,
1053 .prev_term = prev_term,
1054 .leader_commit_idx = state->commit_idx,
1055 .term_of_idx = term_of_idx,
1057#pragma clang diagnostic pop
1059 auto& node = all_other_nodes.at(to);
1061#ifdef CCF_RAFT_TRACING
1062 nlohmann::json j = {};
1063 j[
"function"] =
"send_append_entries";
1065 j[
"state"] = *state;
1066 COMMITTABLE_INDICES(j[
"state"], state);
1067 j[
"to_node_id"] = to;
1068 j[
"match_idx"] = node.match_idx;
1069 j[
"sent_idx"] = node.sent_idx;
1082 node.sent_idx = end_idx;
1085 void recv_append_entries(
1088 const uint8_t* data,
1091 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1094 "Received append entries: {}.{} to {}.{} (from {} in term {})",
1102#ifdef CCF_RAFT_TRACING
1103 nlohmann::json j = {};
1104 j[
"function"] =
"recv_append_entries";
1106 j[
"state"] = *state;
1107 COMMITTABLE_INDICES(j[
"state"], state);
1108 j[
"from_node_id"] = from;
1119 state->current_view == r.term &&
1124 else if (state->current_view < r.term)
1128 else if (state->current_view > r.term)
1132 "Recv append entries to {} from {} but our term is later ({} > {})",
1135 state->current_view,
1137 send_append_entries_response_nack(from);
1142 const auto prev_term = get_term_internal(r.prev_idx);
1143 if (prev_term != r.prev_term)
1146 "Previous term for {} should be {}", r.prev_idx, prev_term);
1153 "Recv append entries to {} from {} but our log does not yet "
1158 send_append_entries_response_nack(from);
1163 "Recv append entries to {} from {} but our log at {} has the wrong "
1164 "previous term (ours: {}, theirs: {})",
1170 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1171 send_append_entries_response_nack(from, rejected_tx);
1178 restart_election_timeout();
1179 if (!leader_id.has_value() || leader_id.value() != from)
1183 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1187 if (r.prev_idx < state->commit_idx)
1190 "Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
1202 else if (r.prev_idx > state->last_idx)
1205 "Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
1214 "Recv append entries to {} from {} for index {} and previous index {}",
1220 std::vector<std::tuple<
1221 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1225 for (
Index i = r.prev_idx + 1; i <= r.idx; i++)
1227 if (i <= state->last_idx)
1234 max_terms_per_append_entries == 1,
1235 "AppendEntries rollback logic assumes single term");
1236 const auto incoming_term = r.term_of_idx;
1237 const auto local_term = state->view_history.view_at(i);
1238 if (incoming_term != local_term)
1240 if (is_new_follower)
1242 auto rollback_level = i - 1;
1244 "New follower received AppendEntries with conflict. Incoming "
1245 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1252 "Dropping conflicting branch. Rolling back {} entries, "
1253 "beginning with {}.{}.",
1254 state->last_idx - rollback_level,
1258 is_new_follower =
false;
1270 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1271 "beginning with {}.{}.",
1272 state->last_idx - (i - 1),
1282 ledger->skip_entry(data, size);
1287 std::vector<uint8_t> entry;
1290 entry = LedgerProxy::get_entry(data, size);
1292 catch (
const std::logic_error& e)
1296 "Recv append entries to {} from {} but the data is malformed: {}",
1300 send_append_entries_response_nack(from);
1305 auto ds = store->deserialize(entry, public_only, expected);
1309 "Recv append entries to {} from {} but the entry could not be "
1313 send_append_entries_response_nack(from);
1317 append_entries.push_back(std::make_tuple(std::move(
ds), i));
1320 execute_append_entries_sync(
1321 std::move(append_entries), from, std::move(r));
1324 void execute_append_entries_sync(
1325 std::vector<std::tuple<
1326 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1331 for (
auto& ae : append_entries)
1334 RAFT_DEBUG_FMT(
"Replicating on follower {}: {}", state->node_id, i);
1336#ifdef CCF_RAFT_TRACING
1337 nlohmann::json j = {};
1338 j[
"function"] =
"execute_append_entries_sync";
1339 j[
"state"] = *state;
1340 COMMITTABLE_INDICES(j[
"state"], state);
1341 j[
"from_node_id"] = from;
1345 bool track_deletes_on_missing_keys =
false;
1347 ds->apply(track_deletes_on_missing_keys);
1351 send_append_entries_response_nack(from);
1354 state->last_idx = i;
1356 for (
auto& hook :
ds->get_hooks())
1361 bool globally_committable =
1363 if (globally_committable)
1365 start_ticking_if_necessary();
1368 const auto& entry =
ds->get_entry();
1371 entry, globally_committable,
ds->get_term(),
ds->get_index());
1373 switch (apply_success)
1379 ledger->truncate(state->last_idx);
1380 send_append_entries_response_nack(from);
1393 state->committable_indices.push_back(i);
1402 state->view_history.update(1, r.term);
1410 max_terms_per_append_entries == 1,
1411 "AppendEntries processing for term updates assumes single "
1413 state->view_history.update(r.prev_idx + 1,
ds->get_term());
1416 commit_if_possible(r.leader_commit_idx);
1433 throw std::logic_error(
"Unknown ApplyResult value");
1438 execute_append_entries_finish(r, from);
1441 void execute_append_entries_finish(
1446 commit_if_possible(r.leader_commit_idx);
1457 state->view_history.update(1, r.term);
1466 state->view_history.update(lci + 1, r.term_of_idx);
1470 send_append_entries_response_ack(from, r);
1473 void send_append_entries_response_ack(
1479 const auto response_idx = ae.idx;
1480 send_append_entries_response(
1484 void send_append_entries_response_nack(
1487 const auto response_idx = find_highest_possible_match(rejected);
1488 const auto response_term = get_term_internal(response_idx);
1490 send_append_entries_response(
1494 void send_append_entries_response_nack(
ccf::NodeId to)
1496 send_append_entries_response(
1499 state->current_view,
1503 void send_append_entries_response(
1510 "Send append entries response from {} to {} for index {}: {}",
1516 AppendEntriesResponse response{
1517 .term = response_term,
1518 .last_log_idx = response_idx,
1522#ifdef CCF_RAFT_TRACING
1523 nlohmann::json j = {};
1524 j[
"function"] =
"send_append_entries_response";
1525 j[
"packet"] = response;
1526 j[
"state"] = *state;
1527 COMMITTABLE_INDICES(j[
"state"], state);
1528 j[
"to_node_id"] = to;
1536 void recv_append_entries_response(
1539 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1541 auto node = all_other_nodes.find(from);
1542 if (node == all_other_nodes.end())
1546 "Recv append entries response to {} from {}: unknown node",
1552#ifdef CCF_RAFT_TRACING
1553 nlohmann::json j = {};
1554 j[
"function"] =
"recv_append_entries_response";
1556 j[
"state"] = *state;
1557 COMMITTABLE_INDICES(j[
"state"], state);
1558 j[
"from_node_id"] = from;
1559 j[
"match_idx"] = node->second.match_idx;
1560 j[
"sent_idx"] = node->second.sent_idx;
1568 "Recv append entries response to {} from {}: no longer leader",
1574 using namespace std::chrono_literals;
1575 node->second.last_ack_timeout = 0ms;
1577 if (state->current_view < r.term)
1581 "Recv append entries response to {} from {}: more recent term ({} "
1586 state->current_view);
1590 else if (state->current_view != r.term)
1600 "Recv append entries response to {} from {}: stale term ({} != {})",
1604 state->current_view);
1608 else if (r.last_log_idx < node->second.match_idx)
1618 "Recv append entries response to {} from {}: stale idx",
1630 "Recv append entries response to {} from {}: failed",
1633 const auto this_match =
1634 find_highest_possible_match({r.term, r.last_log_idx});
1635 node->second.sent_idx = std::max(
1636 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1643 node->second.match_idx =
1644 std::max(node->second.match_idx, r.last_log_idx);
1648 "Recv append entries response to {} from {} for index {}: success",
1659 "Send request vote from {} to {} at {}",
1662 last_committable_idx);
1663 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1666 .term = state->current_view,
1667 .last_committable_idx = last_committable_idx,
1668 .term_of_last_committable_idx = get_term_internal(last_committable_idx),
1671#ifdef CCF_RAFT_TRACING
1672 nlohmann::json j = {};
1673 j[
"function"] =
"send_request_vote";
1675 j[
"state"] = *state;
1676 COMMITTABLE_INDICES(j[
"state"], state);
1677 j[
"to_node_id"] = to;
1684 void recv_request_vote(
const ccf::NodeId& from, RequestVote r)
1686 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1695#ifdef CCF_RAFT_TRACING
1696 nlohmann::json j = {};
1697 j[
"function"] =
"recv_request_vote";
1699 j[
"state"] = *state;
1700 COMMITTABLE_INDICES(j[
"state"], state);
1701 j[
"from_node_id"] = from;
1705 if (state->current_view > r.term)
1709 "Recv request vote to {} from {}: our term is later ({} > {})",
1712 state->current_view,
1714 send_request_vote_response(from,
false);
1717 else if (state->current_view < r.term)
1720 "Recv request vote to {} from {}: their term is later ({} < {})",
1723 state->current_view,
1728 if (leader_id.has_value())
1732 "Recv request vote to {} from {}: leader {} already known in term {}",
1736 state->current_view);
1737 send_request_vote_response(from,
false);
1741 if ((voted_for.has_value()) && (voted_for.value() != from))
1745 "Recv request vote to {} from {}: already voted for {}",
1749 send_request_vote_response(from,
false);
1757 const auto term_of_last_committable_idx =
1758 get_term_internal(last_committable_idx);
1761 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1762 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1763 (r.last_committable_idx >= last_committable_idx));
1769 restart_election_timeout();
1776 "Voting against candidate at {}.{} because local state is at {}.{}",
1777 r.term_of_last_committable_idx,
1778 r.last_committable_idx,
1779 term_of_last_committable_idx,
1780 last_committable_idx);
1783 send_request_vote_response(from, answer);
1786 void send_request_vote_response(
const ccf::NodeId& to,
bool answer)
1789 "Send request vote response from {} to {}: {}",
1794 RequestVoteResponse response{
1795 .term = state->current_view, .vote_granted = answer};
1801 void recv_request_vote_response(
1804 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1806#ifdef CCF_RAFT_TRACING
1807 nlohmann::json j = {};
1808 j[
"function"] =
"recv_request_vote_response";
1810 j[
"state"] = *state;
1811 COMMITTABLE_INDICES(j[
"state"], state);
1812 j[
"from_node_id"] = from;
1819 "Recv request vote response to {} from: {}: we aren't a candidate",
1826 auto node = all_other_nodes.find(from);
1827 if (node == all_other_nodes.end())
1830 "Recv request vote response to {} from {}: unknown node",
1836 if (state->current_view < r.term)
1839 "Recv request vote response to {} from {}: their term is more recent "
1843 state->current_view,
1848 else if (state->current_view != r.term)
1852 "Recv request vote response to {} from {}: stale ({} != {})",
1855 state->current_view,
1859 else if (!r.vote_granted)
1863 "Recv request vote response to {} from {}: they voted no",
1870 "Recv request vote response to {} from {}: they voted yes",
1874 add_vote_for_me(from);
1877 void recv_propose_request_vote(
1880 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1882#ifdef CCF_RAFT_TRACING
1883 nlohmann::json j = {};
1884 j[
"function"] =
"recv_propose_request_vote";
1886 j[
"state"] = *state;
1887 COMMITTABLE_INDICES(j[
"state"], state);
1888 j[
"from_node_id"] = from;
1894 "Becoming candidate early due to propose request vote from {}", from);
1899 RAFT_INFO_FMT(
"Ignoring propose request vote from {}", from);
1903 void restart_election_timeout()
1907 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
1910 void reset_votes_for_me()
1912 votes_for_me.clear();
1913 for (
auto const& conf : configurations)
1915 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
1916 votes_for_me[conf.idx].votes.clear();
1921 void become_candidate()
1923 if (configurations.empty())
1929 "Not becoming candidate {} due to lack of a configuration.",
1937 voted_for = state->node_id;
1938 reset_votes_for_me();
1939 state->current_view++;
1941 restart_election_timeout();
1945 "Becoming candidate {}: {}", state->node_id, state->current_view);
1947#ifdef CCF_RAFT_TRACING
1948 nlohmann::json j = {};
1949 j[
"function"] =
"become_candidate";
1950 j[
"state"] = *state;
1951 COMMITTABLE_INDICES(j[
"state"], state);
1952 j[
"configurations"] = configurations;
1956 add_vote_for_me(state->node_id);
1963 send_request_vote(node_id);
1967 void become_leader(
bool force_become_leader =
false)
1977 "Election index is {} in term {}", election_index, state->current_view);
1981 if (state->commit_idx > 0)
1988 store->initialise_term(state->current_view);
1992 leader_id = state->node_id;
1995 using namespace std::chrono_literals;
1996 timeout_elapsed = 0ms;
2001 "Becoming leader {}: {}", state->node_id, state->current_view);
2003#ifdef CCF_RAFT_TRACING
2004 nlohmann::json j = {};
2005 j[
"function"] =
"become_leader";
2006 j[
"state"] = *state;
2007 COMMITTABLE_INDICES(j[
"state"], state);
2008 j[
"configurations"] = configurations;
2019 auto next = state->last_idx + 1;
2021 for (
auto& node : all_other_nodes)
2023 node.second.match_idx = 0;
2024 node.second.sent_idx = next - 1;
2027 send_append_entries(node.first, next);
2030 if (retired_node_cleanup)
2032 retired_node_cleanup->cleanup();
2042 restart_election_timeout();
2052 "Becoming follower {}: {}.{}",
2054 state->current_view,
2057#ifdef CCF_RAFT_TRACING
2058 nlohmann::json j = {};
2059 j[
"function"] =
"become_follower";
2060 j[
"state"] = *state;
2061 COMMITTABLE_INDICES(j[
"state"], state);
2062 j[
"configurations"] = configurations;
2074 state->current_view = term;
2076 reset_votes_for_me();
2078 is_new_follower =
true;
2085 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2087 state->leadership_state,
2089 state->current_view,
2095 !state->retirement_idx.has_value(),
2096 "retirement_idx already set to {}",
2097 state->retirement_idx.value());
2098 state->retirement_idx = idx;
2103 assert(state->retirement_idx.has_value());
2105 idx >= state->retirement_idx.value(),
2106 "Index {} unexpectedly lower than retirement_idx {}",
2108 state->retirement_idx.value());
2109 state->retirement_committable_idx = idx;
2116 ProposeRequestVote prv{.term = state->current_view};
2118 std::optional<ccf::NodeId> successor = std::nullopt;
2119 Index max_match_idx = 0;
2132 for (
auto& [node, node_state] : all_other_nodes)
2134 if (node_state.match_idx >= max_match_idx)
2137 auto conf = configurations.rbegin();
2138 while (conf != configurations.rend())
2140 if (conf->nodes.find(node) != conf->nodes.end())
2142 latest_reconf_id = conf->idx;
2147 if (!(node_state.match_idx == max_match_idx &&
2148 latest_reconf_id < reconf_id_of_max_match))
2150 reconf_id_of_max_match = latest_reconf_id;
2152 max_match_idx = node_state.match_idx;
2156 if (successor.has_value())
2158 RAFT_INFO_FMT(
"Node retired, nudging {}", successor.value());
2169 state->retirement_phase = phase;
2174 if (configurations.empty())
2177 "Not voting for myself {} due to lack of a configuration.",
2183 for (
auto const& conf : configurations)
2185 auto const&
nodes = conf.nodes;
2192 votes_for_me[conf.idx].votes.insert(from);
2194 "Node {} voted for {} in configuration {} with quorum {}",
2198 votes_for_me[conf.idx].quorum);
2202 bool is_elected =
true;
2203 for (
auto const& v : votes_for_me)
2205 auto const& quorum = v.second.quorum;
2206 auto const& votes = v.second.votes;
2208 if (votes.size() < quorum)
2224 void update_commit()
2228 throw std::logic_error(
2229 "update_commit() must only be called while this node is leader");
2232 std::optional<Index> new_agreement_index = std::nullopt;
2235 for (
auto const& c : configurations)
2239 std::vector<Index> match;
2240 match.reserve(c.nodes.size());
2242 for (
auto node : c.
nodes)
2244 if (node.first == state->node_id)
2246 match.push_back(state->last_idx);
2250 match.push_back(all_other_nodes.at(node.first).match_idx);
2254 sort(match.begin(), match.end());
2255 auto confirmed = match.at((match.size() - 1) / 2);
2258 !new_agreement_index.has_value() ||
2259 confirmed < new_agreement_index.value())
2261 new_agreement_index = confirmed;
2265 if (new_agreement_index.has_value())
2267 if (new_agreement_index.value() > state->last_idx)
2269 throw std::logic_error(
2270 "Followers appear to have later match indices than leader");
2273 const auto new_commit_idx =
2276 if (new_commit_idx.has_value())
2279 "In update_commit, new_commit_idx: {}, "
2281 new_commit_idx.value(),
2284 const auto term_of_new = get_term_internal(new_commit_idx.value());
2285 if (term_of_new == state->current_view)
2287 commit(new_commit_idx.value());
2292 "Ack quorum at {} resulted in proposed commit index {}, which "
2293 "is in term {}. Waiting for agreement on committable entry in "
2294 "current term {} to update commit",
2295 new_agreement_index.value(),
2296 new_commit_idx.value(),
2298 state->current_view);
2306 void commit_if_possible(
Index idx)
2309 "Commit if possible {} (ci: {}) (ti {})",
2312 get_term_internal(idx));
2314 (idx > state->commit_idx) &&
2315 (get_term_internal(idx) <= state->current_view))
2317 const auto highest_committable =
2319 if (highest_committable.has_value())
2321 commit(highest_committable.value());
2326 size_t get_quorum(
size_t n)
const
2331 void commit(
Index idx)
2333 if (idx > state->last_idx)
2335 throw std::logic_error(fmt::format(
2336 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2343 if (idx <= state->commit_idx)
2346#ifdef CCF_RAFT_TRACING
2347 nlohmann::json j = {};
2348 j[
"function"] =
"commit";
2349 j[
"args"] = nlohmann::json::object();
2350 j[
"args"][
"idx"] = idx;
2351 j[
"state"] = *state;
2352 COMMITTABLE_INDICES(j[
"state"], state);
2353 j[
"configurations"] = configurations;
2359 state->commit_idx = idx;
2363 state->retirement_committable_idx.has_value() &&
2364 idx >= state->retirement_committable_idx.value())
2370 store->compact(idx);
2377 bool changed =
false;
2381 auto conf = configurations.begin();
2382 if (conf == configurations.end())
2387 auto next = std::next(conf);
2388 if (next == configurations.end())
2393 if (idx < next->idx)
2399 "Configurations: discard committed configuration at {}", conf->idx);
2400 configurations.pop_front();
2406 create_and_remove_node_state();
2409 retired_node_cleanup->cleanup();
2414 bool is_self_in_latest_config()
2416 bool present =
false;
2417 if (!configurations.empty())
2419 auto current_nodes = configurations.back().nodes;
2420 present = current_nodes.find(state->node_id) != current_nodes.end();
2425 void start_ticking_if_necessary()
2427 if (!ticking && is_self_in_latest_config())
2436 if (idx < state->commit_idx)
2439 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2440 "ignoring rollback request",
2446 store->rollback({get_term_internal(idx), idx}, state->current_view);
2448 RAFT_DEBUG_FMT(
"Setting term in store to: {}", state->current_view);
2450 state->last_idx = idx;
2453 state->view_history.rollback(idx);
2455 while (!state->committable_indices.empty() &&
2456 (state->committable_indices.back() > idx))
2458 state->committable_indices.pop_back();
2465 assert(state->retirement_committable_idx.has_value());
2466 if (state->retirement_committable_idx.value() > idx)
2468 state->retirement_committable_idx = std::nullopt;
2477 assert(state->retirement_idx.has_value());
2478 if (state->retirement_idx.value() > idx)
2480 state->retirement_idx = std::nullopt;
2481 state->retirement_phase = std::nullopt;
2488 bool changed =
false;
2490 while (!configurations.empty() && (configurations.back().idx > idx))
2493 "Configurations: rollback configuration at {}",
2494 configurations.back().idx);
2495 configurations.pop_back();
2501 create_and_remove_node_state();
2511 void create_and_remove_node_state()
2516 for (
auto const& conf : configurations)
2518 for (
auto const& node : conf.nodes)
2520 active_nodes.emplace(node.first, node.second);
2526 std::vector<ccf::NodeId> to_remove;
2528 for (
const auto& node : all_other_nodes)
2530 if (active_nodes.find(node.first) == active_nodes.end())
2532 to_remove.push_back(node.first);
2537 for (
auto node_info : active_nodes)
2539 if (node_info.first == state->node_id)
2544 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2546 if (!
channels->have_channel(node_info.first))
2549 "Configurations: create node channel with {}", node_info.first);
2553 node_info.second.hostname,
2554 node_info.second.port);
2559 auto index = state->last_idx + 1;
2560 all_other_nodes.try_emplace(
2561 node_info.first, node_info.second, index, 0);
2565 send_append_entries(node_info.first, index);
2569 "Added raft node {} ({}:{})",
2571 node_info.second.hostname,
2572 node_info.second.port);