111 std::chrono::milliseconds last_ack_timeout{0};
113 NodeState() =
default;
118 Index match_idx_ = 0) :
119 node_info(std::move(node_info_)),
121 match_idx(match_idx_),
127 std::unique_ptr<Store> store;
130 std::optional<ccf::NodeId> voted_for = std::nullopt;
131 std::optional<ccf::NodeId> leader_id = std::nullopt;
136 std::unordered_set<ccf::NodeId> votes;
139 std::map<Index, Votes> votes_for_me;
141 std::chrono::milliseconds timeout_elapsed;
152 bool is_new_follower =
false;
159 bool should_sign =
false;
161 std::shared_ptr<aft::State> state;
164 std::chrono::milliseconds request_timeout;
165 std::chrono::milliseconds election_timeout;
166 size_t max_uncommitted_tx_count;
167 bool ticking =
false;
170 std::list<Configuration> configurations;
178 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
179 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
182 std::shared_ptr<ccf::NodeClient> node_client;
185 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
187 size_t entry_size_not_limited = 0;
188 size_t entry_count = 0;
189 Index entries_batch_size = 20;
190 static constexpr int batch_window_size = 100;
191 int batch_window_sum = 0;
195 bool public_only =
false;
198 std::uniform_int_distribution<int> distrib;
199 std::default_random_engine rand;
204 static constexpr size_t max_terms_per_append_entries = 1;
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ =
false) :
219 store(
std::move(store_)),
223 state(
std::move(state_)),
225 request_timeout(settings_.message_timeout),
226 election_timeout(settings_.election_timeout),
227 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
229 node_client(
std::move(rpc_request_context_)),
230 retired_node_cleanup(
231 std::make_unique<
ccf::RetiredNodeCleanup>(node_client)),
233 public_only(public_only_),
235 distrib(0, (int)election_timeout.count() / 2),
236 rand((int)(uintptr_t)this),
251 return state->node_id;
266 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
267 return can_replicate_unsafe();
277 if (max_uncommitted_tx_count == 0)
281 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
283 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
288 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
289 if (can_sign_unsafe())
293 return Consensus::SignatureDisposition::SHOULD_SIGN;
295 return Consensus::SignatureDisposition::CAN_SIGN;
297 return Consensus::SignatureDisposition::CANT_REPLICATE;
328 ccf::SeqNo seqno,
const std::vector<ccf::kv::NodeId>& node_ids)
override
330 for (
const auto& node_id : node_ids)
336 "Node is not retired, cannot become retired committed");
339 "Node is not retired, cannot become retired committed");
340 state->retired_committed_idx = seqno;
349 all_other_nodes.erase(node_id);
350 RAFT_INFO_FMT(
"Removed {} from nodes known to consensus", node_id);
357 return state->committable_indices.empty() ?
359 state->committable_indices.back();
367 const auto it = std::upper_bound(
368 state->committable_indices.rbegin(),
369 state->committable_indices.rend(),
371 [](
const auto& l,
const auto& r) { return l >= r; });
372 if (it == state->committable_indices.rend())
382 while (!state->committable_indices.empty() &&
383 (state->committable_indices.front() <= idx))
385 state->committable_indices.pop_front();
393 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
401 if (leader_id.has_value())
403 throw std::logic_error(
404 "Can't force leadership if there is already a leader");
407 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
408 state->current_view += starting_view_change;
415 const std::vector<Index>& terms,
416 Index commit_idx_)
override
420 if (leader_id.has_value())
422 throw std::logic_error(
423 "Can't force leadership if there is already a leader");
426 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
427 state->current_view = term;
428 state->last_idx = index;
429 state->commit_idx = commit_idx_;
430 state->view_history.initialise(terms);
431 state->view_history.update(index, term);
432 state->current_view += starting_view_change;
439 const std::vector<Index>& term_history,
440 Index recovery_start_index = 0)
override
444 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
446 state->last_idx = index;
447 state->commit_idx = index;
449 state->view_history.initialise(term_history);
451 ledger->init(index, recovery_start_index);
458 return state->last_idx;
463 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
464 return get_commit_idx_unsafe();
469 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
470 return state->current_view;
475 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
476 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
477 return {get_term_internal(commit_idx), commit_idx};
482 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
483 return get_term_internal(idx);
489 return state->view_history.get_history_until(idx);
495 return state->view_history.get_history_since(idx);
504 std::set<ccf::NodeId> nodes;
506 for (
auto const& conf : configurations)
508 for (
auto const& [node_id, _] : conf.nodes)
510 if (node_id != state->node_id)
512 nodes.insert(node_id);
524 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
526#ifdef CCF_RAFT_TRACING
527 nlohmann::json j = {};
528 j[
"function"] =
"add_configuration";
530 COMMITTABLE_INDICES(j[
"state"], state);
531 j[
"configurations"] = configurations;
532 j[
"args"] = nlohmann::json::object();
542 !configurations.empty() &&
543 configurations.back().nodes.find(state->node_id) !=
544 configurations.back().nodes.end() &&
545 conf.find(state->node_id) == conf.end())
550 if (configurations.empty() || conf != configurations.back().nodes)
553 configurations.push_back(new_config);
555 create_and_remove_node_state();
562 using namespace std::chrono_literals;
563 timeout_elapsed = 0ms;
569 for (
auto& node : all_other_nodes)
571 using namespace std::chrono_literals;
572 node.second.last_ack_timeout = 0ms;
578 if (configurations.empty())
583 return configurations.back().nodes;
588 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
595 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
605 for (
auto const& conf : configurations)
607 details.
configs.push_back(conf);
609 for (
auto& [k, v] : all_other_nodes)
612 v.match_idx,
static_cast<size_t>(v.last_ack_timeout.count())};
620 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
625 "Failed to replicate {} items: not leader", entries.size());
630 if (term != state->current_view)
633 "Failed to replicate {} items at term {}, current term is {}",
636 state->current_view);
643 "Failed to replicate {} items: node retirement is complete",
651 for (
const auto& [index, data, is_globally_committable, hooks] : entries)
653 bool globally_committable = is_globally_committable;
655 if (index != state->last_idx + 1)
661 "Replicated on leader {}: {}{} ({} hooks)",
664 (globally_committable ?
" committable" :
""),
667#ifdef CCF_RAFT_TRACING
668 nlohmann::json j = {};
669 j[
"function"] =
"replicate";
671 COMMITTABLE_INDICES(j[
"state"], state);
674 j[
"globally_committable"] = globally_committable;
678 for (
auto& hook : *hooks)
683 if (globally_committable)
686 "membership: {} leadership: {}",
687 state->membership_state,
688 state->leadership_state);
695 state->committable_indices.push_back(index);
696 start_ticking_if_necessary();
703 state->last_idx = index;
705 *data, globally_committable, state->current_view, index);
706 entry_size_not_limited += data->size();
709 state->view_history.update(index, state->current_view);
714 entry_size_not_limited = 0;
715 for (
const auto& it : all_other_nodes)
718 send_append_entries(it.first, it.second.sent_idx + 1);
733 const ccf::NodeId& from,
const uint8_t* data,
size_t size)
override
735 auto type = serialized::peek<RaftMsgType>(data, size);
744 channels->template recv_authenticated<AppendEntries>(
746 recv_append_entries(from, r, data, size);
753 channels->template recv_authenticated<AppendEntriesResponse>(
755 recv_append_entries_response(from, r);
762 channels->template recv_authenticated<RequestPreVote>(
764 recv_request_pre_vote(from, r);
772 recv_request_vote(from, r);
779 channels->template recv_authenticated<RequestPreVoteResponse>(
781 recv_request_pre_vote_response(from, r);
788 channels->template recv_authenticated<RequestVoteResponse>(
790 recv_request_vote_response(from, r);
797 channels->template recv_authenticated<ProposeRequestVote>(
799 recv_propose_request_vote(from, r);
819 catch (
const std::exception& e)
827 void periodic(std::chrono::milliseconds elapsed)
override
829 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
830 timeout_elapsed += elapsed;
834 if (timeout_elapsed >= request_timeout)
836 using namespace std::chrono_literals;
837 timeout_elapsed = 0ms;
841 for (
const auto& node : all_other_nodes)
843 send_append_entries(node.first, node.second.sent_idx + 1);
847 for (
auto& node : all_other_nodes)
849 node.second.last_ack_timeout += elapsed;
852 bool every_active_config_has_a_quorum = std::all_of(
853 configurations.begin(),
854 configurations.end(),
856 size_t live_nodes_in_config = 0;
857 for (auto const& node : conf.nodes)
859 auto search = all_other_nodes.find(node.first);
864 search == all_other_nodes.end() ||
866 search->second.last_ack_timeout < election_timeout)
868 ++live_nodes_in_config;
873 "No ack received from {} in last {}",
878 return live_nodes_in_config >= get_quorum(conf.
nodes.size());
881 if (!every_active_config_has_a_quorum)
888 "Stepping down as leader {}: No ack received from a majority of "
889 "backups in last {}",
899 timeout_elapsed >= election_timeout)
902 if (state->pre_vote_enabled)
904 become_pre_vote_candidate();
923 Index probe_index = std::min(tx_id.
seqno, state->last_idx);
924 Term term_of_probe = state->view_history.view_at(probe_index);
925 while (term_of_probe > tx_id.
view)
930 probe_index = state->view_history.start_of_view(term_of_probe);
935 term_of_probe = state->view_history.view_at(probe_index);
939 "Looking for match with {}.{}, from {}.{}, best answer is {}",
942 state->view_history.view_at(state->last_idx),
948 void update_batch_size()
950 auto avg_entry_size = (entry_count == 0) ?
951 append_entries_size_limit :
952 entry_size_not_limited / entry_count;
954 auto batch_size = (avg_entry_size == 0) ?
955 append_entries_size_limit / 2 :
956 append_entries_size_limit / avg_entry_size;
958 auto batch_avg = batch_window_sum / batch_window_size;
960 batch_window_sum += (batch_size - batch_avg);
961 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
966 if (idx > state->last_idx)
971 return state->view_history.view_at(idx);
974 bool can_replicate_unsafe()
977 !is_retired_committed();
980 bool can_sign_unsafe()
983 !is_retired_committed();
986 Index get_commit_idx_unsafe()
988 return state->commit_idx;
994 "Sending append entries to node {} in batches of {}, covering the "
1001 auto calculate_end_index = [
this](
Index start) {
1006 max_terms_per_append_entries == 1,
1007 "AppendEntries construction logic enforces single term");
1008 auto max_idx = state->last_idx;
1009 const auto term_of_ae = state->view_history.view_at(start);
1010 const auto index_at_end_of_term =
1011 state->view_history.end_of_view(term_of_ae);
1012 if (index_at_end_of_term != ccf::kv::NoVersion)
1014 max_idx = index_at_end_of_term;
1016 return std::min(start + entries_batch_size - 1, max_idx);
1025 end_idx = calculate_end_index(start_idx);
1026 RAFT_TRACE_FMT(
"Sending sub range {} -> {}", start_idx, end_idx);
1027 send_append_entries_range(to, start_idx, end_idx);
1028 start_idx = std::min(end_idx + 1, state->last_idx);
1029 }
while (end_idx != state->last_idx);
1032 void send_append_entries_range(
1035 const auto prev_idx = start_idx - 1;
1037 if (is_retired_committed() && start_idx >= end_idx)
1044 const auto prev_term = get_term_internal(prev_idx);
1045 const auto term_of_idx = get_term_internal(end_idx);
1047#pragma clang diagnostic push
1048#pragma clang diagnostic ignored "-Wc99-designator"
1051 {.idx = end_idx, .prev_idx = prev_idx},
1052 .term = state->current_view,
1053 .prev_term = prev_term,
1054 .leader_commit_idx = state->commit_idx,
1055 .term_of_idx = term_of_idx,
1057#pragma clang diagnostic pop
1060 "Send {} from {} to {}: ({}.{}, {}.{}] ({})",
1070 auto& node = all_other_nodes.at(to);
1072#ifdef CCF_RAFT_TRACING
1073 nlohmann::json j = {};
1074 j[
"function"] =
"send_append_entries";
1076 j[
"state"] = *state;
1077 COMMITTABLE_INDICES(j[
"state"], state);
1078 j[
"to_node_id"] = to;
1079 j[
"match_idx"] = node.match_idx;
1080 j[
"sent_idx"] = node.sent_idx;
1086 if (!channels->send_authenticated(
1093 node.sent_idx = end_idx;
1096 void recv_append_entries(
1099 const uint8_t* data,
1102 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1105 "Recv {} to {} from {}: {}.{} to {}.{} in term {}",
1115#ifdef CCF_RAFT_TRACING
1116 nlohmann::json j = {};
1117 j[
"function"] =
"recv_append_entries";
1119 j[
"state"] = *state;
1120 COMMITTABLE_INDICES(j[
"state"], state);
1121 j[
"from_node_id"] = from;
1132 state->current_view == r.term &&
1136 become_aware_of_new_term(r.term);
1138 else if (state->current_view < r.term)
1140 become_aware_of_new_term(r.term);
1142 else if (state->current_view > r.term)
1146 "Recv {} to {} from {} but our term is later ({} > {})",
1150 state->current_view,
1152 send_append_entries_response_nack(from);
1157 const auto prev_term = get_term_internal(r.prev_idx);
1158 if (prev_term != r.prev_term)
1161 "Previous term for {} should be {}", r.prev_idx, prev_term);
1168 "Recv {} to {} from {} but our log does not yet "
1174 send_append_entries_response_nack(from);
1179 "Recv {} to {} from {} but our log at {} has the wrong "
1180 "previous term (ours: {}, theirs: {})",
1187 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1188 send_append_entries_response_nack(from, rejected_tx);
1195 restart_election_timeout();
1196 if (!leader_id.has_value() || leader_id.value() != from)
1200 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1204 if (r.prev_idx < state->commit_idx)
1207 "Recv {} to {} from {} but prev_idx ({}) < commit_idx "
1220 if (r.prev_idx > state->last_idx)
1223 "Recv {} to {} from {} but prev_idx ({}) > last_idx ({})",
1233 "Recv {} to {} from {} for index {} and previous index {}",
1240 std::vector<std::tuple<
1241 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1245 for (
Index i = r.prev_idx + 1; i <= r.idx; i++)
1247 if (i <= state->last_idx)
1254 max_terms_per_append_entries == 1,
1255 "AppendEntries rollback logic assumes single term");
1256 const auto incoming_term = r.term_of_idx;
1257 const auto local_term = state->view_history.view_at(i);
1258 if (incoming_term != local_term)
1260 if (is_new_follower)
1262 auto rollback_level = i - 1;
1264 "New follower received AppendEntries with conflict. Incoming "
1265 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1272 "Dropping conflicting branch. Rolling back {} entries, "
1273 "beginning with {}.{}.",
1274 state->last_idx - rollback_level,
1277 rollback(rollback_level);
1278 is_new_follower =
false;
1290 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1291 "beginning with {}.{}.",
1292 state->last_idx - (i - 1),
1302 ledger->skip_entry(data, size);
1307 std::vector<uint8_t> entry;
1310 entry = LedgerProxy::get_entry(data, size);
1312 catch (
const std::logic_error& e)
1316 "Recv {} to {} from {} but the data is malformed: {}",
1321 send_append_entries_response_nack(from);
1326 auto ds = store->deserialize(entry, public_only, expected);
1330 "Recv {} to {} from {} but the entry could not be "
1335 send_append_entries_response_nack(from);
1339 append_entries.emplace_back(std::move(
ds), i);
1342 execute_append_entries_sync(std::move(append_entries), from, r);
1345 void execute_append_entries_sync(
1346 std::vector<std::tuple<
1347 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1350 const AppendEntries& r)
1352 for (
auto& ae : append_entries)
1355 RAFT_DEBUG_FMT(
"Replicating on follower {}: {}", state->node_id, i);
1357#ifdef CCF_RAFT_TRACING
1358 nlohmann::json j = {};
1359 j[
"function"] =
"execute_append_entries_sync";
1360 j[
"state"] = *state;
1361 COMMITTABLE_INDICES(j[
"state"], state);
1362 j[
"from_node_id"] = from;
1366 bool track_deletes_on_missing_keys =
false;
1368 ds->apply(track_deletes_on_missing_keys);
1371 ledger->truncate(i - 1);
1372 send_append_entries_response_nack(from);
1375 state->last_idx = i;
1377 for (
auto& hook :
ds->get_hooks())
1382 bool globally_committable =
1384 if (globally_committable)
1386 start_ticking_if_necessary();
1389 const auto& entry =
ds->get_entry();
1392 entry, globally_committable,
ds->get_term(),
ds->get_index());
1394 switch (apply_success)
1400 ledger->truncate(state->last_idx);
1401 send_append_entries_response_nack(from);
1414 state->committable_indices.push_back(i);
1416 if (
ds->get_term() != 0u)
1423 state->view_history.update(1, r.term);
1431 max_terms_per_append_entries == 1,
1432 "AppendEntries processing for term updates assumes single "
1434 state->view_history.update(r.prev_idx + 1,
ds->get_term());
1437 commit_if_possible(r.leader_commit_idx);
1450 throw std::logic_error(
"Unknown ApplyResult value");
1455 execute_append_entries_finish(r, from);
1458 void execute_append_entries_finish(
1463 commit_if_possible(r.leader_commit_idx);
1466 auto lci = last_committable_index();
1474 state->view_history.update(1, r.term);
1483 state->view_history.update(lci + 1, r.term_of_idx);
1487 send_append_entries_response_ack(from, r);
1490 void send_append_entries_response_ack(
1496 const auto response_idx = ae.idx;
1497 send_append_entries_response(
1501 void send_append_entries_response_nack(
1504 const auto response_idx = find_highest_possible_match(rejected);
1505 const auto response_term = get_term_internal(response_idx);
1507 send_append_entries_response(
1511 void send_append_entries_response_nack(
ccf::NodeId to)
1513 send_append_entries_response(
1516 state->current_view,
1520 void send_append_entries_response(
1526 AppendEntriesResponse response{
1527 .term = response_term,
1528 .last_log_idx = response_idx,
1533 "Send {} from {} to {} for index {}: {}",
1540#ifdef CCF_RAFT_TRACING
1541 nlohmann::json j = {};
1542 j[
"function"] =
"send_append_entries_response";
1543 j[
"packet"] = response;
1544 j[
"state"] = *state;
1545 COMMITTABLE_INDICES(j[
"state"], state);
1546 j[
"to_node_id"] = to;
1550 channels->send_authenticated(
1554 void recv_append_entries_response(
1557 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1559 auto node = all_other_nodes.find(from);
1560 if (node == all_other_nodes.end())
1564 "Recv append entries response to {} from {}: unknown node",
1570#ifdef CCF_RAFT_TRACING
1571 nlohmann::json j = {};
1572 j[
"function"] =
"recv_append_entries_response";
1574 j[
"state"] = *state;
1575 COMMITTABLE_INDICES(j[
"state"], state);
1576 j[
"from_node_id"] = from;
1577 j[
"match_idx"] = node->second.match_idx;
1578 j[
"sent_idx"] = node->second.sent_idx;
1586 "Recv {} to {} from {}: no longer leader",
1593 using namespace std::chrono_literals;
1594 node->second.last_ack_timeout = 0ms;
1596 if (state->current_view < r.term)
1600 "Recv {} to {} from {}: more recent term ({} "
1606 state->current_view);
1607 become_aware_of_new_term(r.term);
1610 if (state->current_view != r.term)
1620 "Recv {} to {} from {}: stale term ({} != {})",
1625 state->current_view);
1629 else if (r.last_log_idx < node->second.match_idx)
1639 "Recv {} to {} from {}: stale idx", r.msg, state->node_id, from);
1649 "Recv {} to {} from {}: failed", r.msg, state->node_id, from);
1650 const auto this_match =
1651 find_highest_possible_match({r.term, r.last_log_idx});
1652 node->second.sent_idx = std::max(
1653 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1658 node->second.match_idx = std::max(node->second.match_idx, r.last_log_idx);
1661 "Recv {} to {} from {} for index {}: success",
1671 auto last_committable_idx = last_committable_index();
1672 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1675 .term = state->current_view,
1676 .last_committable_idx = last_committable_idx,
1677 .term_of_last_committable_idx =
1678 get_term_internal(last_committable_idx)};
1680#ifdef CCF_RAFT_TRACING
1681 nlohmann::json j = {};
1682 j[
"function"] =
"send_request_vote";
1684 j[
"state"] = *state;
1685 COMMITTABLE_INDICES(j[
"state"], state);
1686 j[
"to_node_id"] = to;
1695 auto last_committable_idx = last_committable_index();
1696 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1699 .term = state->current_view,
1700 .last_committable_idx = last_committable_idx,
1701 .term_of_last_committable_idx =
1702 get_term_internal(last_committable_idx)};
1704#ifdef CCF_RAFT_TRACING
1705 nlohmann::json j = {};
1706 j[
"function"] =
"send_request_vote";
1708 j[
"state"] = *state;
1709 COMMITTABLE_INDICES(j[
"state"], state);
1710 j[
"to_node_id"] = to;
1717 void recv_request_vote_unsafe(
1727 if (state->current_view > r.term)
1731 "Recv {} to {} from {}: our term is later ({} > {})",
1735 state->current_view,
1737 send_request_vote_response(from,
false, election_type);
1740 if (state->current_view < r.term)
1743 "Recv {} to {} from {}: their term is later ({} < {})",
1747 state->current_view,
1754 become_aware_of_new_term(r.term);
1757 bool grant_vote =
true;
1763 "Recv {} to {} from {}: leader {} already known in term {}",
1768 state->current_view);
1772 auto voted_for_other =
1773 (voted_for.has_value()) && (voted_for.value() != from);
1778 "Recv {} to {} from {}: already voted for {}",
1789 const auto last_committable_idx = last_committable_index();
1790 const auto term_of_last_committable_idx =
1791 get_term_internal(last_committable_idx);
1792 const auto log_up_to_date =
1793 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1794 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1795 (r.last_committable_idx >= last_committable_idx));
1796 if (!log_up_to_date)
1799 "Recv {} to {} from {}: candidate log {}.{} is not up-to-date "
1804 r.term_of_last_committable_idx,
1805 r.last_committable_idx,
1806 term_of_last_committable_idx,
1807 last_committable_idx);
1814 restart_election_timeout();
1820 "Recv {} to {} from {}: {} vote to candidate at {}.{} with "
1821 "local state at {}.{}",
1825 grant_vote ?
"granted" :
"denied",
1826 r.term_of_last_committable_idx,
1827 r.last_committable_idx,
1828 term_of_last_committable_idx,
1829 last_committable_idx);
1831 send_request_vote_response(from, grant_vote, election_type);
1834 void recv_request_vote(
const ccf::NodeId& from, RequestVote r)
1836 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1838#ifdef CCF_RAFT_TRACING
1839 nlohmann::json j = {};
1840 j[
"function"] =
"recv_request_vote";
1842 j[
"state"] = *state;
1843 COMMITTABLE_INDICES(j[
"state"], state);
1844 j[
"from_node_id"] = from;
1851 void recv_request_pre_vote(
const ccf::NodeId& from, RequestPreVote r)
1853 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1855#ifdef CCF_RAFT_TRACING
1856 nlohmann::json j = {};
1857 j[
"function"] =
"recv_request_vote";
1859 j[
"state"] = *state;
1860 COMMITTABLE_INDICES(j[
"state"], state);
1861 j[
"from_node_id"] = from;
1869 .last_committable_idx = r.last_committable_idx,
1870 .term_of_last_committable_idx = r.term_of_last_committable_idx,
1876 void send_request_vote_response(
1881 RequestVoteResponse response{
1882 .term = state->current_view, .vote_granted = answer};
1885 "Send {} from {} to {}: {}",
1891 channels->send_authenticated(
1896 RequestPreVoteResponse response{
1897 .term = state->current_view, .vote_granted = answer};
1900 "Send {} from {} to {}: {}",
1906 channels->send_authenticated(
1911 void recv_request_vote_response(
1913 RequestVoteResponse r,
1916 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1918#ifdef CCF_RAFT_TRACING
1919 nlohmann::json j = {};
1920 j[
"function"] =
"recv_request_vote_response";
1922 j[
"state"] = *state;
1923 COMMITTABLE_INDICES(j[
"state"], state);
1924 j[
"from_node_id"] = from;
1929 auto node = all_other_nodes.find(from);
1930 if (node == all_other_nodes.end())
1933 "Recv {} to {} from {}: unknown node", r.msg, state->node_id, from);
1937 if (state->current_view < r.term)
1940 "Recv {} to {} from {}: their term is more recent "
1945 state->current_view,
1947 become_aware_of_new_term(r.term);
1950 if (state->current_view != r.term)
1954 "Recv request vote response to {} from {}: stale ({} != {})",
1957 state->current_view,
1967 "Recv {} to {} from: {}: we aren't a candidate",
1980 "Recv {} to {} from {}: no longer a candidate in {}",
1997 "Recv {} to {} from {}: unexpected message in {} when "
2003 state->current_view);
2007 if (!r.vote_granted)
2011 "Recv request vote response to {} from {}: they voted no",
2018 "Recv request vote response to {} from {}: they voted yes",
2022 add_vote_for_me(from);
2025 void recv_request_vote_response(
2031 void recv_request_pre_vote_response(
2032 const ccf::NodeId& from, RequestPreVoteResponse r)
2034 RequestVoteResponse rvr{.term = r.term, .vote_granted = r.vote_granted};
2039 void recv_propose_request_vote(
2042 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
2044#ifdef CCF_RAFT_TRACING
2045 nlohmann::json j = {};
2046 j[
"function"] =
"recv_propose_request_vote";
2048 j[
"state"] = *state;
2049 COMMITTABLE_INDICES(j[
"state"], state);
2050 j[
"from_node_id"] = from;
2053 if (!is_retired_committed() && ticking && r.term == state->current_view)
2056 "Becoming candidate early due to propose request vote from {}", from);
2061 RAFT_INFO_FMT(
"Ignoring propose request vote from {}", from);
2065 void restart_election_timeout()
2069 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
2072 void reset_votes_for_me()
2074 votes_for_me.clear();
2075 for (
auto const& conf : configurations)
2077 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
2078 votes_for_me[conf.idx].votes.clear();
2082 void become_pre_vote_candidate()
2084 if (configurations.empty())
2087 "Not becoming pre-vote candidate {} due to lack of a configuration.",
2095 reset_votes_for_me();
2096 restart_election_timeout();
2099 "Becoming pre-vote candidate {}: {}",
2101 state->current_view);
2103#ifdef CCF_RAFT_TRACING
2104 nlohmann::json j = {};
2105 j[
"function"] =
"become_pre_vote_candidate";
2106 j[
"state"] = *state;
2107 COMMITTABLE_INDICES(j[
"state"], state);
2108 j[
"configurations"] = configurations;
2112 add_vote_for_me(state->node_id);
2116 for (
auto const& node_id : other_nodes_in_active_configs())
2119 send_request_pre_vote(node_id);
2124 void become_candidate()
2126 if (configurations.empty())
2132 "Not becoming candidate {} due to lack of a configuration.",
2140 voted_for = state->node_id;
2141 reset_votes_for_me();
2142 state->current_view++;
2144 restart_election_timeout();
2145 reset_last_ack_timeouts();
2148 "Becoming candidate {}: {}", state->node_id, state->current_view);
2150#ifdef CCF_RAFT_TRACING
2151 nlohmann::json j = {};
2152 j[
"function"] =
"become_candidate";
2153 j[
"state"] = *state;
2154 COMMITTABLE_INDICES(j[
"state"], state);
2155 j[
"configurations"] = configurations;
2159 add_vote_for_me(state->node_id);
2163 for (
auto const& node_id : other_nodes_in_active_configs())
2166 send_request_vote(node_id);
2170 void become_leader(
bool =
false)
2172 if (is_retired_committed())
2177 const auto election_index = last_committable_index();
2180 "Election index is {} in term {}", election_index, state->current_view);
2184 if (state->commit_idx > 0)
2186 rollback(election_index);
2191 store->initialise_term(state->current_view);
2195 leader_id = state->node_id;
2198 using namespace std::chrono_literals;
2199 timeout_elapsed = 0ms;
2201 reset_last_ack_timeouts();
2204 "Becoming leader {}: {}", state->node_id, state->current_view);
2206#ifdef CCF_RAFT_TRACING
2207 nlohmann::json j = {};
2208 j[
"function"] =
"become_leader";
2209 j[
"state"] = *state;
2210 COMMITTABLE_INDICES(j[
"state"], state);
2211 j[
"configurations"] = configurations;
2216 if (other_nodes_in_active_configs().size() == 0)
2222 auto next = state->last_idx + 1;
2224 for (
auto& node : all_other_nodes)
2226 node.second.match_idx = 0;
2227 node.second.sent_idx = next - 1;
2230 send_append_entries(node.first, next);
2233 if (retired_node_cleanup)
2235 retired_node_cleanup->cleanup();
2245 restart_election_timeout();
2246 reset_last_ack_timeouts();
2251 rollback(last_committable_index());
2255 "Becoming follower {}: {}.{}",
2257 state->current_view,
2260#ifdef CCF_RAFT_TRACING
2261 nlohmann::json j = {};
2262 j[
"function"] =
"become_follower";
2263 j[
"state"] = *state;
2264 COMMITTABLE_INDICES(j[
"state"], state);
2265 j[
"configurations"] = configurations;
2277 state->current_view = term;
2279 reset_votes_for_me();
2281 is_new_follower =
true;
2285 void send_propose_request_vote()
2289 std::optional<ccf::NodeId> successor = std::nullopt;
2290 Index max_match_idx = 0;
2303 for (
auto& [node, node_state] : all_other_nodes)
2305 if (node_state.match_idx >= max_match_idx)
2308 auto conf = configurations.rbegin();
2309 while (conf != configurations.rend())
2311 if (conf->nodes.find(node) != conf->nodes.end())
2313 latest_reconf_id = conf->idx;
2318 if (!(node_state.match_idx == max_match_idx &&
2319 latest_reconf_id < reconf_id_of_max_match))
2321 reconf_id_of_max_match = latest_reconf_id;
2323 max_match_idx = node_state.match_idx;
2327 if (successor.has_value())
2329 RAFT_INFO_FMT(
"Proposing that {} becomes candidate", successor.value());
2330 channels->send_authenticated(
2337 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2339 state->leadership_state,
2341 state->current_view,
2347 !state->retirement_idx.has_value(),
2348 "retirement_idx already set to {}",
2350 state->retirement_idx.value());
2351 state->retirement_idx = idx;
2356 assert(state->retirement_idx.has_value());
2359 idx >= state->retirement_idx.value(),
2360 "Index {} unexpectedly lower than retirement_idx {}",
2363 state->retirement_idx.value());
2364 state->retirement_committable_idx = idx;
2371 send_propose_request_vote();
2379 state->retirement_phase = phase;
2384 if (configurations.empty())
2387 "Not voting for myself {} due to lack of a configuration.",
2393 for (
auto const& conf : configurations)
2395 auto const&
nodes = conf.nodes;
2402 votes_for_me[conf.idx].votes.insert(from);
2404 "Node {} voted for {} in configuration {} with quorum {}",
2408 votes_for_me[conf.idx].quorum);
2412 bool is_elected =
true;
2413 for (
auto const& v : votes_for_me)
2415 auto const& quorum = v.second.quorum;
2416 auto const& votes = v.second.votes;
2418 if (votes.size() < quorum)
2427 switch (state->leadership_state)
2436 throw std::logic_error(
2437 "add_vote_for_me() called while not a pre-vote candidate or "
2446 void update_commit()
2450 throw std::logic_error(
2451 "update_commit() must only be called while this node is leader");
2454 std::optional<Index> new_agreement_index = std::nullopt;
2457 for (
auto const& c : configurations)
2461 std::vector<Index> match;
2462 match.reserve(c.nodes.size());
2464 for (
const auto& node : c.
nodes)
2466 if (node.first == state->node_id)
2468 match.push_back(state->last_idx);
2472 match.push_back(all_other_nodes.at(node.first).match_idx);
2476 sort(match.begin(), match.end());
2477 auto confirmed = match.at((match.size() - 1) / 2);
2480 !new_agreement_index.has_value() ||
2481 confirmed < new_agreement_index.value())
2483 new_agreement_index = confirmed;
2487 if (new_agreement_index.has_value())
2489 if (new_agreement_index.value() > state->last_idx)
2491 throw std::logic_error(
2492 "Followers appear to have later match indices than leader");
2495 const auto new_commit_idx =
2496 find_highest_possible_committable_index(new_agreement_index.value());
2498 if (new_commit_idx.has_value())
2501 "In update_commit, new_commit_idx: {}, "
2503 new_commit_idx.value(),
2506 const auto term_of_new = get_term_internal(new_commit_idx.value());
2507 if (term_of_new == state->current_view)
2509 commit(new_commit_idx.value());
2514 "Ack quorum at {} resulted in proposed commit index {}, which "
2515 "is in term {}. Waiting for agreement on committable entry in "
2516 "current term {} to update commit",
2517 new_agreement_index.value(),
2518 new_commit_idx.value(),
2520 state->current_view);
2528 void commit_if_possible(
Index idx)
2531 "Commit if possible {} (ci: {}) (ti {})",
2534 get_term_internal(idx));
2536 (idx > state->commit_idx) &&
2537 (get_term_internal(idx) <= state->current_view))
2539 const auto highest_committable =
2540 find_highest_possible_committable_index(idx);
2541 if (highest_committable.has_value())
2543 commit(highest_committable.value());
2548 size_t get_quorum(
size_t n)
const
2553 void commit(
Index idx)
2555 if (idx > state->last_idx)
2557 throw std::logic_error(fmt::format(
2558 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2565 if (idx <= state->commit_idx)
2570#ifdef CCF_RAFT_TRACING
2571 nlohmann::json j = {};
2572 j[
"function"] =
"commit";
2573 j[
"args"] = nlohmann::json::object();
2574 j[
"args"][
"idx"] = idx;
2575 j[
"state"] = *state;
2576 COMMITTABLE_INDICES(j[
"state"], state);
2577 j[
"configurations"] = configurations;
2581 compact_committable_indices(idx);
2583 state->commit_idx = idx;
2587 state->retirement_committable_idx.has_value())
2589 const auto retirement_committable =
2591 ->retirement_committable_idx.value();
2592 if (idx >= retirement_committable)
2599 store->compact(idx);
2600 ledger->commit(idx);
2606 bool changed =
false;
2610 auto conf = configurations.begin();
2611 if (conf == configurations.end())
2616 auto next = std::next(conf);
2617 if (next == configurations.end())
2622 if (idx < next->idx)
2628 "Configurations: discard committed configuration at {}", conf->idx);
2629 configurations.pop_front();
2635 create_and_remove_node_state();
2636 if (retired_node_cleanup && is_primary())
2638 retired_node_cleanup->cleanup();
2643 bool is_self_in_latest_config()
2645 bool present =
false;
2646 if (!configurations.empty())
2648 auto current_nodes = configurations.back().nodes;
2649 present = current_nodes.find(state->node_id) != current_nodes.end();
2654 void start_ticking_if_necessary()
2656 if (!ticking && is_self_in_latest_config())
2665 if (idx < state->commit_idx)
2668 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2669 "ignoring rollback request",
2675 store->rollback({get_term_internal(idx), idx}, state->current_view);
2677 RAFT_DEBUG_FMT(
"Setting term in store to: {}", state->current_view);
2678 ledger->truncate(idx);
2679 state->last_idx = idx;
2682 state->view_history.rollback(idx);
2684 while (!state->committable_indices.empty() &&
2685 (state->committable_indices.back() > idx))
2687 state->committable_indices.pop_back();
2694 assert(state->retirement_committable_idx.has_value());
2695 if (state->retirement_committable_idx.has_value())
2697 const auto retirement_committable =
2699 ->retirement_committable_idx.value();
2700 if (retirement_committable > idx)
2702 state->retirement_committable_idx = std::nullopt;
2712 assert(state->retirement_idx.has_value());
2713 if (state->retirement_idx.has_value())
2715 const auto retirement =
2717 ->retirement_idx.value();
2718 if (retirement > idx)
2720 state->retirement_idx = std::nullopt;
2721 state->retirement_phase = std::nullopt;
2729 bool changed =
false;
2731 while (!configurations.empty() && (configurations.back().idx > idx))
2734 "Configurations: rollback configuration at {}",
2735 configurations.back().idx);
2736 configurations.pop_back();
2742 create_and_remove_node_state();
2756 "Not proposing request vote from {} since not leader",
2761 LOG_INFO_FMT(
"Nominating successor for {}", state->node_id);
2763#ifdef CCF_RAFT_TRACING
2764 nlohmann::json j = {};
2765 j[
"function"] =
"step_down_and_nominate_successor";
2766 j[
"state"] = *state;
2767 COMMITTABLE_INDICES(j[
"state"], state);
2768 j[
"configurations"] = configurations;
2772 send_propose_request_vote();
2776 void create_and_remove_node_state()
2781 for (
auto const& conf : configurations)
2783 for (
auto const& node : conf.nodes)
2785 active_nodes.emplace(node.first, node.second);
2791 std::vector<ccf::NodeId> to_remove;
2793 for (
const auto& node : all_other_nodes)
2795 if (active_nodes.find(node.first) == active_nodes.end())
2797 to_remove.push_back(node.first);
2802 for (
auto node_info : active_nodes)
2804 if (node_info.first == state->node_id)
2809 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2811 if (!channels->have_channel(node_info.first))
2814 "Configurations: create node channel with {}", node_info.first);
2816 channels->associate_node_address(
2818 node_info.second.hostname,
2819 node_info.second.port);
2824 auto index = state->last_idx + 1;
2825 all_other_nodes.try_emplace(
2826 node_info.first, node_info.second, index, 0);
2830 send_append_entries(node_info.first, index);
2834 "Added raft node {} ({}:{})",
2836 node_info.second.hostname,
2837 node_info.second.port);