CCF
Loading...
Searching...
No Matches
raft.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/pal/locking.h"
8#include "ccf/tx_id.h"
9#include "ccf/tx_status.h"
11#include "ds/ccf_assert.h"
12#include "ds/internal_logger.h"
13#include "ds/serialized.h"
14#include "impl/state.h"
15#include "kv/kv_types.h"
16#include "node/node_client.h"
17#include "node/node_to_node.h"
18#include "node/node_types.h"
20#include "raft_types.h"
22
23#include <algorithm>
24#include <list>
25#include <random>
26#include <unordered_map>
27#include <vector>
28
29#ifdef VERBOSE_RAFT_LOGGING
30# define RAFT_TRACE_FMT(s, ...) \
31 CCF_LOG_FMT(TRACE, "raft") \
32 ("{} | {} | {} | " s, \
33 state->node_id, \
34 state->leadership_state, \
35 state->membership_state, \
36 ##__VA_ARGS__)
37# define RAFT_DEBUG_FMT(s, ...) \
38 CCF_LOG_FMT(DEBUG, "raft") \
39 ("{} | {} | {} | " s, \
40 state->node_id, \
41 state->leadership_state, \
42 state->membership_state, \
43 ##__VA_ARGS__)
44# define RAFT_INFO_FMT(s, ...) \
45 CCF_LOG_FMT(INFO, "raft") \
46 ("{} | {} | {} | " s, \
47 state->node_id, \
48 state->leadership_state, \
49 state->membership_state, \
50 ##__VA_ARGS__)
51# define RAFT_FAIL_FMT(s, ...) \
52 CCF_LOG_FMT(FAIL, "raft") \
53 ("{} | {} | {} | " s, \
54 state->node_id, \
55 state->leadership_state, \
56 state->membership_state, \
57 ##__VA_ARGS__)
58#else
59# define RAFT_TRACE_FMT LOG_TRACE_FMT
60# define RAFT_DEBUG_FMT LOG_DEBUG_FMT
61# define RAFT_INFO_FMT LOG_INFO_FMT
62# define RAFT_FAIL_FMT LOG_FAIL_FMT
63#endif
64
65#define RAFT_TRACE_JSON_OUT(json_object) \
66 CCF_LOG_OUT(DEBUG, "raft_trace") << json_object
67
68#ifdef CCF_RAFT_TRACING
69
70static inline void add_committable_indices_start_and_end(
71 nlohmann::json& j, const std::shared_ptr<aft::State>& state)
72{
73 std::vector<aft::Index> committable_indices;
74 if (!state->committable_indices.empty())
75 {
76 committable_indices.push_back(state->committable_indices.front());
77 if (state->committable_indices.size() > 1)
78 {
79 committable_indices.push_back(state->committable_indices.back());
80 }
81 }
82 j["committable_indices"] = committable_indices;
83}
84
85# define COMMITTABLE_INDICES(event_state, state) \
86 add_committable_indices_start_and_end(event_state, state);
87
88#endif
89
90#define LOG_ROLLBACK_INFO_FMT CCF_LOG_FMT(INFO, "rollback")
91
92namespace aft
93{
95
96 template <class LedgerProxy>
97 class Aft : public ccf::kv::Consensus
98 {
99 private:
100 struct NodeState
101 {
102 Configuration::NodeInfo node_info;
103
104 // the highest index sent to the node
105 Index sent_idx = 0;
106
107 // the highest matching index with the node that was confirmed
108 Index match_idx = 0;
109
110 // timeout tracking the last time an ack was received from the node
111 std::chrono::milliseconds last_ack_timeout{0};
112
113 NodeState() = default;
114
115 NodeState(
116 Configuration::NodeInfo node_info_,
117 Index sent_idx_,
118 Index match_idx_ = 0) :
119 node_info(std::move(node_info_)),
120 sent_idx(sent_idx_),
121 match_idx(match_idx_),
122 last_ack_timeout(0)
123 {}
124 };
125
126 // Persistent
127 std::unique_ptr<Store> store;
128
129 // Volatile
130 std::optional<ccf::NodeId> voted_for = std::nullopt;
131 std::optional<ccf::NodeId> leader_id = std::nullopt;
132
133 // Keep track of votes in each active configuration
134 struct Votes
135 {
136 std::unordered_set<ccf::NodeId> votes;
137 size_t quorum = 0;
138 };
139 std::map<Index, Votes> votes_for_me;
140
141 std::chrono::milliseconds timeout_elapsed;
142
143 // When this node receives append entries from a new primary, it may need to
144 // roll back a committable but uncommitted suffix it holds. The
145 // new primary dictates the index where this suffix begins, which
146 // following the Raft election rules must be at least as high as the highest
147 // commit index reported by the previous primary. The window in which this
148 // rollback could be accepted is minimised to avoid unnecessary
149 // retransmissions - this node only executes this rollback instruction on
150 // the first append entries after it became a follower. As with any append
151 // entries, the initial index will not advance until this node acks.
152 bool is_new_follower = false;
153
154 // When this node becomes primary, they should produce a new signature in
155 // the current view. This signature is the first thing they may commit, as
156 // they cannot confirm commit of anything from a previous view (Raft paper
157 // section 5.4.2). This bool is true from the point this node becomes
158 // primary, until it sees a committable entry
159 bool should_sign = false;
160
161 std::shared_ptr<aft::State> state;
162
163 // Timeouts
164 std::chrono::milliseconds request_timeout;
165 std::chrono::milliseconds election_timeout;
166 size_t max_uncommitted_tx_count;
167 bool ticking = false;
168
169 // Configurations
170 std::list<Configuration> configurations;
171 // Union of other nodes (i.e. all nodes but us) in each active
172 // configuration, plus those that are retired, but for which
173 // the persistence of retirement knowledge is not yet established,
174 // i.e. Completed but not RetiredCommitted
175 // This should be used for diagnostic or broadcasting
176 // messages but _not_ for counting quorums, which should be done for each
177 // active configuration.
178 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
179 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
180
181 // Node client to trigger submission of RPC requests
182 std::shared_ptr<ccf::NodeClient> node_client;
183
184 // Used to remove retired nodes from store
185 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
186
187 size_t entry_size_not_limited = 0;
188 size_t entry_count = 0;
189 Index entries_batch_size = 20;
190 static constexpr int batch_window_size = 100;
191 int batch_window_sum = 0;
192
193 // When this is set, only public domain is deserialised when receiving
194 // append entries
195 bool public_only = false;
196
197 // Randomness
198 std::uniform_int_distribution<int> distrib;
199 std::default_random_engine rand;
200
201 // AppendEntries messages are currently constrained to only contain entries
202 // from a single term, so that the receiver can know the term of each entry
203 // pre-deserialisation, without an additional header.
204 static constexpr size_t max_terms_per_append_entries = 1;
205
206 public:
207 static constexpr size_t append_entries_size_limit = 20000;
208 std::unique_ptr<LedgerProxy> ledger;
209 std::shared_ptr<ccf::NodeToNode> channels;
210
212 const ccf::consensus::Configuration& settings_,
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ = false) :
219 store(std::move(store_)),
220
221 timeout_elapsed(0),
222
223 state(std::move(state_)),
224
225 request_timeout(settings_.message_timeout),
226 election_timeout(settings_.election_timeout),
227 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
228
229 node_client(std::move(rpc_request_context_)),
230 retired_node_cleanup(
231 std::make_unique<ccf::RetiredNodeCleanup>(node_client)),
232
233 public_only(public_only_),
234
235 distrib(0, (int)election_timeout.count() / 2),
236 rand((int)(uintptr_t)this),
237
238 ledger(std::move(ledger_)),
239 channels(std::move(channels_))
240 {}
241
242 ~Aft() override = default;
243
244 std::optional<ccf::NodeId> primary() override
245 {
246 return leader_id;
247 }
248
249 ccf::NodeId id() override
250 {
251 return state->node_id;
252 }
253
254 bool is_primary() override
255 {
256 return state->leadership_state == ccf::kv::LeadershipState::Leader;
257 }
258
259 bool is_candidate() override
260 {
261 return state->leadership_state == ccf::kv::LeadershipState::Candidate;
262 }
263
264 bool can_replicate() override
265 {
266 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
267 return can_replicate_unsafe();
268 }
269
275 bool is_at_max_capacity() override
276 {
277 if (max_uncommitted_tx_count == 0)
278 {
279 return false;
280 }
281 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
282 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
283 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
284 }
285
286 Consensus::SignatureDisposition get_signature_disposition() override
287 {
288 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
289 if (can_sign_unsafe())
290 {
291 if (should_sign)
292 {
293 return Consensus::SignatureDisposition::SHOULD_SIGN;
294 }
295 return Consensus::SignatureDisposition::CAN_SIGN;
296 }
297 return Consensus::SignatureDisposition::CANT_REPLICATE;
298 }
299
300 bool is_backup() override
301 {
302 return state->leadership_state == ccf::kv::LeadershipState::Follower;
303 }
304
305 bool is_active() const
306 {
307 return state->membership_state == ccf::kv::MembershipState::Active;
308 }
309
310 bool is_retired() const
311 {
312 return state->membership_state == ccf::kv::MembershipState::Retired;
313 }
314
316 {
317 return state->membership_state == ccf::kv::MembershipState::Retired &&
318 state->retirement_phase == ccf::kv::RetirementPhase::RetiredCommitted;
319 }
320
322 {
323 return state->membership_state == ccf::kv::MembershipState::Retired &&
324 state->retirement_phase == ccf::kv::RetirementPhase::Completed;
325 }
326
328 ccf::SeqNo seqno, const std::vector<ccf::kv::NodeId>& node_ids) override
329 {
330 for (const auto& node_id : node_ids)
331 {
332 if (id() == node_id)
333 {
335 state->membership_state == ccf::kv::MembershipState::Retired,
336 "Node is not retired, cannot become retired committed");
338 state->retirement_phase == ccf::kv::RetirementPhase::Completed,
339 "Node is not retired, cannot become retired committed");
340 state->retired_committed_idx = seqno;
341 become_retired(seqno, ccf::kv::RetirementPhase::RetiredCommitted);
342 }
343 else
344 {
345 // Once a node's retired_committed status is itself committed, all
346 // future primaries in the network must be aware its retirement is
347 // committed, and so no longer need any communication with it to
348 // advance commit. No further communication with this node is needed.
349 all_other_nodes.erase(node_id);
350 RAFT_INFO_FMT("Removed {} from nodes known to consensus", node_id);
351 }
352 }
353 }
354
356 {
357 return state->committable_indices.empty() ?
358 state->commit_idx :
359 state->committable_indices.back();
360 }
361
362 // Returns the highest committable index which is not greater than the
363 // given idx.
365 Index idx) const
366 {
367 const auto it = std::upper_bound(
368 state->committable_indices.rbegin(),
369 state->committable_indices.rend(),
370 idx,
371 [](const auto& l, const auto& r) { return l >= r; });
372 if (it == state->committable_indices.rend())
373 {
374 return std::nullopt;
375 }
376
377 return *it;
378 }
379
381 {
382 while (!state->committable_indices.empty() &&
383 (state->committable_indices.front() <= idx))
384 {
385 state->committable_indices.pop_front();
386 }
387 }
388
389 void enable_all_domains() override
390 {
391 // When receiving append entries as a follower, all security domains will
392 // be deserialised
393 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
394 public_only = false;
395 }
396
397 void force_become_primary() override
398 {
399 // This is unsafe and should only be called when the node is certain
400 // there is no leader and no other node will attempt to force leadership.
401 if (leader_id.has_value())
402 {
403 throw std::logic_error(
404 "Can't force leadership if there is already a leader");
405 }
406
407 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
408 state->current_view += starting_view_change;
409 become_leader(true);
410 }
411
413 Index index,
414 Term term,
415 const std::vector<Index>& terms,
416 Index commit_idx_) override
417 {
418 // This is unsafe and should only be called when the node is certain
419 // there is no leader and no other node will attempt to force leadership.
420 if (leader_id.has_value())
421 {
422 throw std::logic_error(
423 "Can't force leadership if there is already a leader");
424 }
425
426 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
427 state->current_view = term;
428 state->last_idx = index;
429 state->commit_idx = commit_idx_;
430 state->view_history.initialise(terms);
431 state->view_history.update(index, term);
432 state->current_view += starting_view_change;
433 become_leader(true);
434 }
435
437 Index index,
438 Term term,
439 const std::vector<Index>& term_history,
440 Index recovery_start_index = 0) override
441 {
442 // This should only be called when the node resumes from a snapshot and
443 // before it has received any append entries.
444 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
445
446 state->last_idx = index;
447 state->commit_idx = index;
448
449 state->view_history.initialise(term_history);
450
451 ledger->init(index, recovery_start_index);
452
454 }
455
457 {
458 return state->last_idx;
459 }
460
462 {
463 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
464 return get_commit_idx_unsafe();
465 }
466
467 Term get_view() override
468 {
469 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
470 return state->current_view;
471 }
472
473 std::pair<Term, Index> get_committed_txid() override
474 {
475 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
476 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
477 return {get_term_internal(commit_idx), commit_idx};
478 }
479
480 Term get_view(Index idx) override
481 {
482 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
483 return get_term_internal(idx);
484 }
485
486 std::vector<Index> get_view_history(Index idx) override
487 {
488 // This should only be called when the spin lock is held.
489 return state->view_history.get_history_until(idx);
490 }
491
492 std::vector<Index> get_view_history_since(Index idx) override
493 {
494 // This should only be called when the spin lock is held.
495 return state->view_history.get_history_since(idx);
496 }
497
498 // Same as ccfraft.tla GetServerSet/IsInServerSet
499 // Not to be confused with all_other_nodes, which includes retired completed
500 // nodes. Used to restrict sending vote requests, and when becoming a
501 // leader, to decide whether to advance commit.
502 std::set<ccf::NodeId> other_nodes_in_active_configs() const
503 {
504 std::set<ccf::NodeId> nodes;
505
506 for (auto const& conf : configurations)
507 {
508 for (auto const& [node_id, _] : conf.nodes)
509 {
510 if (node_id != state->node_id)
511 {
512 nodes.insert(node_id);
513 }
514 }
515 }
516
517 return nodes;
518 }
519
521 Index idx, const ccf::kv::Configuration::Nodes& conf) override
522 {
524 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
525
526#ifdef CCF_RAFT_TRACING
527 nlohmann::json j = {};
528 j["function"] = "add_configuration";
529 j["state"] = *state;
530 COMMITTABLE_INDICES(j["state"], state);
531 j["configurations"] = configurations;
532 j["args"] = nlohmann::json::object();
533 j["args"]["configuration"] = Configuration{idx, conf, idx};
535#endif
536
537 // Detect when we are retired by observing a configuration
538 // from which we are absent following a configuration in which
539 // we were included. Note that this relies on retirement being
540 // a final state, and node identities never being re-used.
541 if (
542 !configurations.empty() &&
543 configurations.back().nodes.find(state->node_id) !=
544 configurations.back().nodes.end() &&
545 conf.find(state->node_id) == conf.end())
546 {
547 become_retired(idx, ccf::kv::RetirementPhase::Ordered);
548 }
549
550 if (configurations.empty() || conf != configurations.back().nodes)
551 {
552 Configuration new_config = {idx, conf, idx};
553 configurations.push_back(new_config);
554
555 create_and_remove_node_state();
556 }
557 }
558
560 {
561 ticking = true;
562 using namespace std::chrono_literals;
563 timeout_elapsed = 0ms;
564 RAFT_INFO_FMT("Election timer has become active");
565 }
566
568 {
569 for (auto& node : all_other_nodes)
570 {
571 using namespace std::chrono_literals;
572 node.second.last_ack_timeout = 0ms;
573 }
574 }
575
577 {
578 if (configurations.empty())
579 {
580 return {};
581 }
582
583 return configurations.back().nodes;
584 }
585
587 {
588 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
590 }
591
593 {
595 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
596 details.primary_id = leader_id;
597 details.current_view = state->current_view;
598 details.ticking = ticking;
599 details.leadership_state = state->leadership_state;
600 details.membership_state = state->membership_state;
601 if (is_retired())
602 {
603 details.retirement_phase = state->retirement_phase;
604 }
605 for (auto const& conf : configurations)
606 {
607 details.configs.push_back(conf);
608 }
609 for (auto& [k, v] : all_other_nodes)
610 {
611 details.acks[k] = {
612 v.match_idx, static_cast<size_t>(v.last_ack_timeout.count())};
613 }
615 return details;
616 }
617
618 bool replicate(const ccf::kv::BatchVector& entries, Term term) override
619 {
620 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
621
622 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
623 {
625 "Failed to replicate {} items: not leader", entries.size());
626 rollback(state->last_idx);
627 return false;
628 }
629
630 if (term != state->current_view)
631 {
633 "Failed to replicate {} items at term {}, current term is {}",
634 entries.size(),
635 term,
636 state->current_view);
637 return false;
638 }
639
641 {
643 "Failed to replicate {} items: node retirement is complete",
644 entries.size());
645 rollback(state->last_idx);
646 return false;
647 }
648
649 RAFT_DEBUG_FMT("Replicating {} entries", entries.size());
650
651 for (const auto& [index, data, is_globally_committable, hooks] : entries)
652 {
653 bool globally_committable = is_globally_committable;
654
655 if (index != state->last_idx + 1)
656 {
657 return false;
658 }
659
661 "Replicated on leader {}: {}{} ({} hooks)",
662 state->node_id,
663 index,
664 (globally_committable ? " committable" : ""),
665 hooks->size());
666
667#ifdef CCF_RAFT_TRACING
668 nlohmann::json j = {};
669 j["function"] = "replicate";
670 j["state"] = *state;
671 COMMITTABLE_INDICES(j["state"], state);
672 j["view"] = term;
673 j["seqno"] = index;
674 j["globally_committable"] = globally_committable;
676#endif
677
678 for (auto& hook : *hooks)
679 {
680 hook->call(this);
681 }
682
683 if (globally_committable)
684 {
686 "membership: {} leadership: {}",
687 state->membership_state,
688 state->leadership_state);
689 if (
690 state->membership_state == ccf::kv::MembershipState::Retired &&
691 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
692 {
693 become_retired(index, ccf::kv::RetirementPhase::Signed);
694 }
695 state->committable_indices.push_back(index);
696 start_ticking_if_necessary();
697
698 // Reset should_sign here - whenever we see a committable entry we
699 // don't need to produce _another_ signature
700 should_sign = false;
701 }
702
703 state->last_idx = index;
704 ledger->put_entry(
705 *data, globally_committable, state->current_view, index);
706 entry_size_not_limited += data->size();
707 entry_count++;
708
709 state->view_history.update(index, state->current_view);
710 if (entry_size_not_limited >= append_entries_size_limit)
711 {
712 update_batch_size();
713 entry_count = 0;
714 entry_size_not_limited = 0;
715 for (const auto& it : all_other_nodes)
716 {
717 RAFT_DEBUG_FMT("Sending updates to follower {}", it.first);
718 send_append_entries(it.first, it.second.sent_idx + 1);
719 }
720 }
721 }
722
723 // Try to advance commit at once if there are no other nodes.
724 if (other_nodes_in_active_configs().size() == 0)
725 {
726 update_commit();
727 }
728
729 return true;
730 }
731
733 const ccf::NodeId& from, const uint8_t* data, size_t size) override
734 {
735 auto type = serialized::peek<RaftMsgType>(data, size);
736
737 try
738 {
739 switch (type)
740 {
742 {
743 AppendEntries r =
744 channels->template recv_authenticated<AppendEntries>(
745 from, data, size);
746 recv_append_entries(from, r, data, size);
747 break;
748 }
749
751 {
753 channels->template recv_authenticated<AppendEntriesResponse>(
754 from, data, size);
755 recv_append_entries_response(from, r);
756 break;
757 }
758
760 {
762 channels->template recv_authenticated<RequestPreVote>(
763 from, data, size);
764 recv_request_pre_vote(from, r);
765 break;
766 }
767
769 {
770 RequestVote r = channels->template recv_authenticated<RequestVote>(
771 from, data, size);
772 recv_request_vote(from, r);
773 break;
774 }
775
777 {
779 channels->template recv_authenticated<RequestPreVoteResponse>(
780 from, data, size);
781 recv_request_pre_vote_response(from, r);
782 break;
783 }
784
786 {
788 channels->template recv_authenticated<RequestVoteResponse>(
789 from, data, size);
790 recv_request_vote_response(from, r);
791 break;
792 }
793
795 {
797 channels->template recv_authenticated<ProposeRequestVote>(
798 from, data, size);
799 recv_propose_request_vote(from, r);
800 break;
801 }
802
803 default:
804 {
805 RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
806 }
807 }
808 }
810 {
811 RAFT_INFO_FMT("Dropped invalid message from {}", e.from);
812 return;
813 }
815 {
816 RAFT_FAIL_FMT("Failed to parse message: {}", ise.what());
817 return;
818 }
819 catch (const std::exception& e)
820 {
821 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
822 LOG_DEBUG_FMT("Error: {}", e.what());
823 return;
824 }
825 }
826
827 void periodic(std::chrono::milliseconds elapsed) override
828 {
829 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
830 timeout_elapsed += elapsed;
831
832 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
833 {
834 if (timeout_elapsed >= request_timeout)
835 {
836 using namespace std::chrono_literals;
837 timeout_elapsed = 0ms;
838
839 update_batch_size();
840 // Send newly available entries to all other nodes.
841 for (const auto& node : all_other_nodes)
842 {
843 send_append_entries(node.first, node.second.sent_idx + 1);
844 }
845 }
846
847 for (auto& node : all_other_nodes)
848 {
849 node.second.last_ack_timeout += elapsed;
850 }
851
852 bool every_active_config_has_a_quorum = std::all_of(
853 configurations.begin(),
854 configurations.end(),
855 [this](const Configuration& conf) {
856 size_t live_nodes_in_config = 0;
857 for (auto const& node : conf.nodes)
858 {
859 auto search = all_other_nodes.find(node.first);
860 if (
861 // if a (non-self) node is in a configuration, then it is in
862 // all_other_nodes. So if a node in a configuration is not found
863 // in all_other_nodes, it must be self, and hence is live
864 search == all_other_nodes.end() ||
865 // Otherwise we use the most recent ack as a failure probe
866 search->second.last_ack_timeout < election_timeout)
867 {
868 ++live_nodes_in_config;
869 }
870 else
871 {
872 RAFT_DEBUG_FMT(
873 "No ack received from {} in last {}",
874 node.first,
875 election_timeout);
876 }
877 }
878 return live_nodes_in_config >= get_quorum(conf.nodes.size());
879 });
880
881 if (!every_active_config_has_a_quorum)
882 {
883 // CheckQuorum: The primary automatically steps down if there are no
884 // active configuration in which it has heard back from a majority of
885 // backups within an election timeout.
886 // Also see CheckQuorum action in tla/ccfraft.tla.
888 "Stepping down as leader {}: No ack received from a majority of "
889 "backups in last {}",
890 state->node_id,
891 election_timeout);
893 }
894 }
895 else
896 {
897 if (
898 !is_retired_committed() && ticking &&
899 timeout_elapsed >= election_timeout)
900 {
901 // Start an election.
902 if (state->pre_vote_enabled)
903 {
904 become_pre_vote_candidate();
905 }
906 else
907 {
908 become_candidate();
909 }
910 }
911 }
912 }
913
914 private:
915 Index find_highest_possible_match(const ccf::TxID& tx_id)
916 {
917 // Find the highest TxID this node thinks exists, which is still
918 // compatible with the given tx_id. That is, given T.n, find largest n'
919 // such that n' <= n && term_of(n') == T' && T' <= T. This may be T.n
920 // itself, if this node holds that index. Otherwise, examine the final
921 // entry in each term, counting backwards, until we find one which is
922 // still possible.
923 Index probe_index = std::min(tx_id.seqno, state->last_idx);
924 Term term_of_probe = state->view_history.view_at(probe_index);
925 while (term_of_probe > tx_id.view)
926 {
927 // Next possible match is the end of the previous term, which is
928 // 1-before the start of the currently considered term. Anything after
929 // that must have a term which is still too high.
930 probe_index = state->view_history.start_of_view(term_of_probe);
931 if (probe_index > 0)
932 {
933 --probe_index;
934 }
935 term_of_probe = state->view_history.view_at(probe_index);
936 }
937
939 "Looking for match with {}.{}, from {}.{}, best answer is {}",
940 tx_id.view,
941 tx_id.seqno,
942 state->view_history.view_at(state->last_idx),
943 state->last_idx,
944 probe_index);
945 return probe_index;
946 }
947
948 void update_batch_size()
949 {
950 auto avg_entry_size = (entry_count == 0) ?
951 append_entries_size_limit :
952 entry_size_not_limited / entry_count;
953
954 auto batch_size = (avg_entry_size == 0) ?
955 append_entries_size_limit / 2 :
956 append_entries_size_limit / avg_entry_size;
957
958 auto batch_avg = batch_window_sum / batch_window_size;
959 // balance out total batch size across batch window
960 batch_window_sum += (batch_size - batch_avg);
961 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
962 }
963
964 Term get_term_internal(Index idx)
965 {
966 if (idx > state->last_idx)
967 {
968 return ccf::VIEW_UNKNOWN;
969 }
970
971 return state->view_history.view_at(idx);
972 }
973
974 bool can_replicate_unsafe()
975 {
976 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
977 !is_retired_committed();
978 }
979
980 bool can_sign_unsafe()
981 {
982 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
983 !is_retired_committed();
984 }
985
986 Index get_commit_idx_unsafe()
987 {
988 return state->commit_idx;
989 }
990
991 void send_append_entries(const ccf::NodeId& to, Index start_idx)
992 {
994 "Sending append entries to node {} in batches of {}, covering the "
995 "range {} -> {}",
996 to,
997 entries_batch_size,
998 start_idx,
999 state->last_idx);
1000
1001 auto calculate_end_index = [this](Index start) {
1002 // Cap the end index in 2 ways:
1003 // - Must contain no more than entries_batch_size entries
1004 // - Must contain entries from a single term
1005 static_assert(
1006 max_terms_per_append_entries == 1,
1007 "AppendEntries construction logic enforces single term");
1008 auto max_idx = state->last_idx;
1009 const auto term_of_ae = state->view_history.view_at(start);
1010 const auto index_at_end_of_term =
1011 state->view_history.end_of_view(term_of_ae);
1012 if (index_at_end_of_term != ccf::kv::NoVersion)
1013 {
1014 max_idx = index_at_end_of_term;
1015 }
1016 return std::min(start + entries_batch_size - 1, max_idx);
1017 };
1018
1019 Index end_idx = 0;
1020
1021 // We break _after_ sending, so that in the case where this is called
1022 // with start==last, we send a single empty heartbeat
1023 do
1024 {
1025 end_idx = calculate_end_index(start_idx);
1026 RAFT_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
1027 send_append_entries_range(to, start_idx, end_idx);
1028 start_idx = std::min(end_idx + 1, state->last_idx);
1029 } while (end_idx != state->last_idx);
1030 }
1031
1032 void send_append_entries_range(
1033 const ccf::NodeId& to, Index start_idx, Index end_idx)
1034 {
1035 const auto prev_idx = start_idx - 1;
1036
1037 if (is_retired_committed() && start_idx >= end_idx)
1038 {
1039 // Continue to replicate, but do not send heartbeats if we know our
1040 // retirement is committed
1041 return;
1042 }
1043
1044 const auto prev_term = get_term_internal(prev_idx);
1045 const auto term_of_idx = get_term_internal(end_idx);
1046
1047#pragma clang diagnostic push
1048#pragma clang diagnostic ignored "-Wc99-designator"
1049 AppendEntries ae{
1050 {},
1051 {.idx = end_idx, .prev_idx = prev_idx},
1052 .term = state->current_view,
1053 .prev_term = prev_term,
1054 .leader_commit_idx = state->commit_idx,
1055 .term_of_idx = term_of_idx,
1056 };
1057#pragma clang diagnostic pop
1058
1060 "Send {} from {} to {}: ({}.{}, {}.{}] ({})",
1061 ae.msg,
1062 state->node_id,
1063 to,
1064 prev_term,
1065 prev_idx,
1066 term_of_idx,
1067 end_idx,
1068 state->commit_idx);
1069
1070 auto& node = all_other_nodes.at(to);
1071
1072#ifdef CCF_RAFT_TRACING
1073 nlohmann::json j = {};
1074 j["function"] = "send_append_entries";
1075 j["packet"] = ae;
1076 j["state"] = *state;
1077 COMMITTABLE_INDICES(j["state"], state);
1078 j["to_node_id"] = to;
1079 j["match_idx"] = node.match_idx;
1080 j["sent_idx"] = node.sent_idx;
1082#endif
1083
1084 // The host will append log entries to this message when it is
1085 // sent to the destination node.
1086 if (!channels->send_authenticated(
1088 {
1089 return;
1090 }
1091
1092 // Record the most recent index we have sent to this node.
1093 node.sent_idx = end_idx;
1094 }
1095
1096 void recv_append_entries(
1097 const ccf::NodeId& from,
1098 AppendEntries r,
1099 const uint8_t* data,
1100 size_t size)
1101 {
1102 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1103
1105 "Recv {} to {} from {}: {}.{} to {}.{} in term {}",
1106 r.msg,
1107 state->node_id,
1108 from,
1109 r.prev_term,
1110 r.prev_idx,
1111 r.term_of_idx,
1112 r.idx,
1113 r.term);
1114
1115#ifdef CCF_RAFT_TRACING
1116 nlohmann::json j = {};
1117 j["function"] = "recv_append_entries";
1118 j["packet"] = r;
1119 j["state"] = *state;
1120 COMMITTABLE_INDICES(j["state"], state);
1121 j["from_node_id"] = from;
1123#endif
1124
1125 // Don't check that the sender node ID is valid. Accept anything that
1126 // passes the integrity check. This way, entries containing dynamic
1127 // topology changes that include adding this new leader can be accepted.
1128
1129 // First, check append entries term against our own term, becoming
1130 // follower if necessary
1131 if (
1132 state->current_view == r.term &&
1133 (state->leadership_state == ccf::kv::LeadershipState::Candidate ||
1134 state->leadership_state == ccf::kv::LeadershipState::PreVoteCandidate))
1135 {
1136 become_aware_of_new_term(r.term);
1137 }
1138 else if (state->current_view < r.term)
1139 {
1140 become_aware_of_new_term(r.term);
1141 }
1142 else if (state->current_view > r.term)
1143 {
1144 // Reply false, since our term is later than the received term.
1146 "Recv {} to {} from {} but our term is later ({} > {})",
1147 r.msg,
1148 state->node_id,
1149 from,
1150 state->current_view,
1151 r.term);
1152 send_append_entries_response_nack(from);
1153 return;
1154 }
1155
1156 // Second, check term consistency with the entries we have so far
1157 const auto prev_term = get_term_internal(r.prev_idx);
1158 if (prev_term != r.prev_term)
1159 {
1161 "Previous term for {} should be {}", r.prev_idx, prev_term);
1162
1163 // Reply false if the log doesn't contain an entry at r.prev_idx
1164 // whose term is r.prev_term. Rejects "future" entries.
1165 if (prev_term == 0)
1166 {
1168 "Recv {} to {} from {} but our log does not yet "
1169 "contain index {}",
1170 r.msg,
1171 state->node_id,
1172 from,
1173 r.prev_idx);
1174 send_append_entries_response_nack(from);
1175 }
1176 else
1177 {
1179 "Recv {} to {} from {} but our log at {} has the wrong "
1180 "previous term (ours: {}, theirs: {})",
1181 r.msg,
1182 state->node_id,
1183 from,
1184 r.prev_idx,
1185 prev_term,
1186 r.prev_term);
1187 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1188 send_append_entries_response_nack(from, rejected_tx);
1189 }
1190 return;
1191 }
1192
1193 // If the terms match up, it is sufficient to convince us that the sender
1194 // is leader in our term
1195 restart_election_timeout();
1196 if (!leader_id.has_value() || leader_id.value() != from)
1197 {
1198 leader_id = from;
1200 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1201 }
1202
1203 // Third, check index consistency, making sure entries are not in the past
1204 if (r.prev_idx < state->commit_idx)
1205 {
1207 "Recv {} to {} from {} but prev_idx ({}) < commit_idx "
1208 "({})",
1209 r.msg,
1210 state->node_id,
1211 from,
1212 r.prev_idx,
1213 state->commit_idx);
1214 return;
1215 }
1216 // Redundant with check on get_term_internal() at line 1149
1217 // Which captures this case in every situation, except r.prev_term == 0.
1218 // That only happens if r.prev_idx == 0 however, see line 1033,
1219 // in which case this path should not be taken either.
1220 if (r.prev_idx > state->last_idx)
1221 {
1223 "Recv {} to {} from {} but prev_idx ({}) > last_idx ({})",
1224 r.msg,
1225 state->node_id,
1226 from,
1227 r.prev_idx,
1228 state->last_idx);
1229 return;
1230 }
1231
1233 "Recv {} to {} from {} for index {} and previous index {}",
1234 r.msg,
1235 state->node_id,
1236 from,
1237 r.idx,
1238 r.prev_idx);
1239
1240 std::vector<std::tuple<
1241 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1243 append_entries;
1244 // Finally, deserialise each entry in the batch
1245 for (Index i = r.prev_idx + 1; i <= r.idx; i++)
1246 {
1247 if (i <= state->last_idx)
1248 {
1249 // NB: This is only safe as long as AppendEntries only contain a
1250 // single term. If they cover multiple terms, then we would need to at
1251 // least partially deserialise each entry to establish what term it is
1252 // in (or report the terms in the header)
1253 static_assert(
1254 max_terms_per_append_entries == 1,
1255 "AppendEntries rollback logic assumes single term");
1256 const auto incoming_term = r.term_of_idx;
1257 const auto local_term = state->view_history.view_at(i);
1258 if (incoming_term != local_term)
1259 {
1260 if (is_new_follower)
1261 {
1262 auto rollback_level = i - 1;
1264 "New follower received AppendEntries with conflict. Incoming "
1265 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1266 incoming_term,
1267 i,
1268 local_term,
1269 i,
1270 rollback_level);
1272 "Dropping conflicting branch. Rolling back {} entries, "
1273 "beginning with {}.{}.",
1274 state->last_idx - rollback_level,
1275 local_term,
1276 i);
1277 rollback(rollback_level);
1278 is_new_follower = false;
1279 // Then continue to process this AE as normal
1280 }
1281 else
1282 {
1283 // We have a node retaining a conflicting suffix, and refusing to
1284 // roll it back. It will remain divergent (not contributing to
1285 // commit) this term, and can only be brought in-sync in a future
1286 // term.
1287 // This log is emitted as a canary, for what we hope is an
1288 // unreachable branch. If it is ever seen we should revisit this.
1290 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1291 "beginning with {}.{}.",
1292 state->last_idx - (i - 1),
1293 local_term,
1294 i);
1295 return;
1296 }
1297 }
1298 else
1299 {
1300 // If the current entry has already been deserialised, skip the
1301 // payload for that entry
1302 ledger->skip_entry(data, size);
1303 continue;
1304 }
1305 }
1306
1307 std::vector<uint8_t> entry;
1308 try
1309 {
1310 entry = LedgerProxy::get_entry(data, size);
1311 }
1312 catch (const std::logic_error& e)
1313 {
1314 // This should only fail if there is malformed data.
1316 "Recv {} to {} from {} but the data is malformed: {}",
1317 r.msg,
1318 state->node_id,
1319 from,
1320 e.what());
1321 send_append_entries_response_nack(from);
1322 return;
1323 }
1324
1325 ccf::TxID expected{r.term_of_idx, i};
1326 auto ds = store->deserialize(entry, public_only, expected);
1327 if (ds == nullptr)
1328 {
1330 "Recv {} to {} from {} but the entry could not be "
1331 "deserialised",
1332 r.msg,
1333 state->node_id,
1334 from);
1335 send_append_entries_response_nack(from);
1336 return;
1337 }
1338
1339 append_entries.emplace_back(std::move(ds), i);
1340 }
1341
1342 execute_append_entries_sync(std::move(append_entries), from, r);
1343 }
1344
1345 void execute_append_entries_sync(
1346 std::vector<std::tuple<
1347 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1348 ccf::kv::Version>>&& append_entries,
1349 const ccf::NodeId& from,
1350 const AppendEntries& r)
1351 {
1352 for (auto& ae : append_entries)
1353 {
1354 auto& [ds, i] = ae;
1355 RAFT_DEBUG_FMT("Replicating on follower {}: {}", state->node_id, i);
1356
1357#ifdef CCF_RAFT_TRACING
1358 nlohmann::json j = {};
1359 j["function"] = "execute_append_entries_sync";
1360 j["state"] = *state;
1361 COMMITTABLE_INDICES(j["state"], state);
1362 j["from_node_id"] = from;
1364#endif
1365
1366 bool track_deletes_on_missing_keys = false;
1367 ccf::kv::ApplyResult apply_success =
1368 ds->apply(track_deletes_on_missing_keys);
1369 if (apply_success == ccf::kv::ApplyResult::FAIL)
1370 {
1371 ledger->truncate(i - 1);
1372 send_append_entries_response_nack(from);
1373 return;
1374 }
1375 state->last_idx = i;
1376
1377 for (auto& hook : ds->get_hooks())
1378 {
1379 hook->call(this);
1380 }
1381
1382 bool globally_committable =
1383 (apply_success == ccf::kv::ApplyResult::PASS_SIGNATURE);
1384 if (globally_committable)
1385 {
1386 start_ticking_if_necessary();
1387 }
1388
1389 const auto& entry = ds->get_entry();
1390
1391 ledger->put_entry(
1392 entry, globally_committable, ds->get_term(), ds->get_index());
1393
1394 switch (apply_success)
1395 {
1397 {
1398 RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
1399 state->last_idx--;
1400 ledger->truncate(state->last_idx);
1401 send_append_entries_response_nack(from);
1402 break;
1403 }
1404
1406 {
1407 RAFT_DEBUG_FMT("Deserialising signature at {}", i);
1408 if (
1409 state->membership_state == ccf::kv::MembershipState::Retired &&
1410 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
1411 {
1412 become_retired(i, ccf::kv::RetirementPhase::Signed);
1413 }
1414 state->committable_indices.push_back(i);
1415
1416 if (ds->get_term() != 0u)
1417 {
1418 // A signature for sig_term tells us that all transactions from
1419 // the previous signature onwards (at least, if not further back)
1420 // happened in sig_term. We reflect this in the history.
1421 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1422 {
1423 state->view_history.update(1, r.term);
1424 }
1425 else
1426 {
1427 // NB: This is only safe as long as AppendEntries only contain a
1428 // single term. If they cover multiple terms, then we need to
1429 // know our previous signature locally.
1430 static_assert(
1431 max_terms_per_append_entries == 1,
1432 "AppendEntries processing for term updates assumes single "
1433 "term");
1434 state->view_history.update(r.prev_idx + 1, ds->get_term());
1435 }
1436
1437 commit_if_possible(r.leader_commit_idx);
1438 }
1439 break;
1440 }
1441
1444 {
1445 break;
1446 }
1447
1448 default:
1449 {
1450 throw std::logic_error("Unknown ApplyResult value");
1451 }
1452 }
1453 }
1454
1455 execute_append_entries_finish(r, from);
1456 }
1457
1458 void execute_append_entries_finish(
1459 const AppendEntries& r, const ccf::NodeId& from)
1460 {
1461 // After entries have been deserialised, try to commit the leader's
1462 // commit index and update our term history accordingly
1463 commit_if_possible(r.leader_commit_idx);
1464
1465 // The term may have changed, and we have not have seen a signature yet.
1466 auto lci = last_committable_index();
1467 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1468 {
1469 // If we don't yet have a term history, then this must be happening in
1470 // the current term. This can only happen before _any_ transactions have
1471 // occurred, when processing a heartbeat at index 0, which does not
1472 // happen in a real node (due to the genesis transaction executing
1473 // before ticks start), but may happen in tests.
1474 state->view_history.update(1, r.term);
1475 }
1476 else
1477 {
1478 // The end of this append entries (r.idx) was not a signature, but may
1479 // be in a new term. If it's a new term, this term started immediately
1480 // after the previous signature we saw (lci, last committable index).
1481 if (r.idx > lci)
1482 {
1483 state->view_history.update(lci + 1, r.term_of_idx);
1484 }
1485 }
1486
1487 send_append_entries_response_ack(from, r);
1488 }
1489
1490 void send_append_entries_response_ack(
1491 ccf::NodeId to, const AppendEntries& ae)
1492 {
1493 // If we get to here, we have applied up to r.idx in this AppendEntries.
1494 // We must only ACK this far, as we know nothing about the agreement of a
1495 // suffix we may still hold _after_ r.idx with the leader's log
1496 const auto response_idx = ae.idx;
1497 send_append_entries_response(
1498 to, AppendEntriesResponseType::OK, state->current_view, response_idx);
1499 }
1500
1501 void send_append_entries_response_nack(
1502 ccf::NodeId to, const ccf::TxID& rejected)
1503 {
1504 const auto response_idx = find_highest_possible_match(rejected);
1505 const auto response_term = get_term_internal(response_idx);
1506
1507 send_append_entries_response(
1508 to, AppendEntriesResponseType::FAIL, response_term, response_idx);
1509 }
1510
1511 void send_append_entries_response_nack(ccf::NodeId to)
1512 {
1513 send_append_entries_response(
1514 to,
1516 state->current_view,
1517 state->last_idx);
1518 }
1519
1520 void send_append_entries_response(
1521 ccf::NodeId to,
1523 aft::Term response_term,
1524 aft::Index response_idx)
1525 {
1526 AppendEntriesResponse response{
1527 .term = response_term,
1528 .last_log_idx = response_idx,
1529 .success = answer,
1530 };
1531
1533 "Send {} from {} to {} for index {}: {}",
1534 response.msg,
1535 state->node_id,
1536 to,
1537 response_idx,
1538 (answer == AppendEntriesResponseType::OK ? "ACK" : "NACK"));
1539
1540#ifdef CCF_RAFT_TRACING
1541 nlohmann::json j = {};
1542 j["function"] = "send_append_entries_response";
1543 j["packet"] = response;
1544 j["state"] = *state;
1545 COMMITTABLE_INDICES(j["state"], state);
1546 j["to_node_id"] = to;
1548#endif
1549
1550 channels->send_authenticated(
1551 to, ccf::NodeMsgType::consensus_msg, response);
1552 }
1553
1554 void recv_append_entries_response(
1555 const ccf::NodeId& from, AppendEntriesResponse r)
1556 {
1557 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1558
1559 auto node = all_other_nodes.find(from);
1560 if (node == all_other_nodes.end())
1561 {
1562 // Ignore if we don't recognise the node.
1564 "Recv append entries response to {} from {}: unknown node",
1565 state->node_id,
1566 from);
1567 return;
1568 }
1569
1570#ifdef CCF_RAFT_TRACING
1571 nlohmann::json j = {};
1572 j["function"] = "recv_append_entries_response";
1573 j["packet"] = r;
1574 j["state"] = *state;
1575 COMMITTABLE_INDICES(j["state"], state);
1576 j["from_node_id"] = from;
1577 j["match_idx"] = node->second.match_idx;
1578 j["sent_idx"] = node->second.sent_idx;
1580#endif
1581
1582 // Ignore if we're not the leader.
1583 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
1584 {
1586 "Recv {} to {} from {}: no longer leader",
1587 r.msg,
1588 state->node_id,
1589 from);
1590 return;
1591 }
1592
1593 using namespace std::chrono_literals;
1594 node->second.last_ack_timeout = 0ms;
1595
1596 if (state->current_view < r.term)
1597 {
1598 // We are behind, update our state.
1600 "Recv {} to {} from {}: more recent term ({} "
1601 "> {})",
1602 r.msg,
1603 state->node_id,
1604 from,
1605 r.term,
1606 state->current_view);
1607 become_aware_of_new_term(r.term);
1608 return;
1609 }
1610 if (state->current_view != r.term)
1611 {
1612 // Stale response, discard if success.
1613 // Otherwise reset sent_idx and try again.
1614 // NB: In NACKs the term may be that of an estimated matching index
1615 // in the log, rather than the current term, so it is correct for it to
1616 // be older in this case.
1617 if (r.success == AppendEntriesResponseType::OK)
1618 {
1620 "Recv {} to {} from {}: stale term ({} != {})",
1621 r.msg,
1622 state->node_id,
1623 from,
1624 r.term,
1625 state->current_view);
1626 return;
1627 }
1628 }
1629 else if (r.last_log_idx < node->second.match_idx)
1630 {
1631 // Response about past indices, discard if success.
1632 // Otherwise reset sent_idx and try again.
1633 // NB: It is correct for this index to move backwards during NACKs
1634 // which iteratively discover the last matching index of divergent logs
1635 // after an election.
1636 if (r.success == AppendEntriesResponseType::OK)
1637 {
1639 "Recv {} to {} from {}: stale idx", r.msg, state->node_id, from);
1640 return;
1641 }
1642 }
1643
1644 // Update next or match for the responding node.
1645 if (r.success == AppendEntriesResponseType::FAIL)
1646 {
1647 // Failed due to log inconsistency. Reset sent_idx, and try again soon.
1649 "Recv {} to {} from {}: failed", r.msg, state->node_id, from);
1650 const auto this_match =
1651 find_highest_possible_match({r.term, r.last_log_idx});
1652 node->second.sent_idx = std::max(
1653 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1654 return;
1655 }
1656 // max(...) because why would we ever want to go backwards on a success
1657 // response?!
1658 node->second.match_idx = std::max(node->second.match_idx, r.last_log_idx);
1659
1661 "Recv {} to {} from {} for index {}: success",
1662 r.msg,
1663 state->node_id,
1664 from,
1665 r.last_log_idx);
1666 update_commit();
1667 }
1668
1669 void send_request_pre_vote(const ccf::NodeId& to)
1670 {
1671 auto last_committable_idx = last_committable_index();
1672 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1673
1674 RequestPreVote rpv{
1675 .term = state->current_view,
1676 .last_committable_idx = last_committable_idx,
1677 .term_of_last_committable_idx =
1678 get_term_internal(last_committable_idx)};
1679
1680#ifdef CCF_RAFT_TRACING
1681 nlohmann::json j = {};
1682 j["function"] = "send_request_vote";
1683 j["packet"] = rpv;
1684 j["state"] = *state;
1685 COMMITTABLE_INDICES(j["state"], state);
1686 j["to_node_id"] = to;
1688#endif
1689
1690 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rpv);
1691 }
1692
1693 void send_request_vote(const ccf::NodeId& to)
1694 {
1695 auto last_committable_idx = last_committable_index();
1696 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1697
1698 RequestVote rv{
1699 .term = state->current_view,
1700 .last_committable_idx = last_committable_idx,
1701 .term_of_last_committable_idx =
1702 get_term_internal(last_committable_idx)};
1703
1704#ifdef CCF_RAFT_TRACING
1705 nlohmann::json j = {};
1706 j["function"] = "send_request_vote";
1707 j["packet"] = rv;
1708 j["state"] = *state;
1709 COMMITTABLE_INDICES(j["state"], state);
1710 j["to_node_id"] = to;
1712#endif
1713
1714 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rv);
1715 }
1716
1717 void recv_request_vote_unsafe(
1718 const ccf::NodeId& from, RequestVote r, ElectionType election_type)
1719 {
1720 // Do not check that from is a known node. It is possible to receive
1721 // RequestVotes from nodes that this node doesn't yet know, just as it
1722 // receives AppendEntries from those nodes. These should be obeyed just
1723 // like any other RequestVote - it is possible that this node is needed to
1724 // produce a primary in the new term, who will then help this node catch
1725 // up.
1726
1727 if (state->current_view > r.term)
1728 {
1729 // Reply false, since our term is later than the received term.
1731 "Recv {} to {} from {}: our term is later ({} > {})",
1732 r.msg,
1733 state->node_id,
1734 from,
1735 state->current_view,
1736 r.term);
1737 send_request_vote_response(from, false, election_type);
1738 return;
1739 }
1740 if (state->current_view < r.term)
1741 {
1743 "Recv {} to {} from {}: their term is later ({} < {})",
1744 r.msg,
1745 state->node_id,
1746 from,
1747 state->current_view,
1748 r.term);
1749
1750 // Even if ElectionType::PreVote, we should still update the term.
1751 // A pre-vote-candidate does not update its term until it becomes a
1752 // candidate. So a pre-vote request from a higher term indicates that we
1753 // should catch up to the term that had a candidate in it.
1754 become_aware_of_new_term(r.term);
1755 }
1756
1757 bool grant_vote = true;
1758
1759 if ((election_type == ElectionType::RegularVote) && leader_id.has_value())
1760 {
1761 // Reply false, since we already know the leader in the current term.
1763 "Recv {} to {} from {}: leader {} already known in term {}",
1764 r.msg,
1765 state->node_id,
1766 from,
1767 leader_id.value(),
1768 state->current_view);
1769 grant_vote = false;
1770 }
1771
1772 auto voted_for_other =
1773 (voted_for.has_value()) && (voted_for.value() != from);
1774 if ((election_type == ElectionType::RegularVote) && voted_for_other)
1775 {
1776 // Reply false, since we already voted for someone else.
1778 "Recv {} to {} from {}: already voted for {}",
1779 r.msg,
1780 state->node_id,
1781 from,
1782 voted_for.value());
1783 grant_vote = false;
1784 }
1785
1786 // If the candidate's committable log is at least as up-to-date as ours,
1787 // vote yes
1788
1789 const auto last_committable_idx = last_committable_index();
1790 const auto term_of_last_committable_idx =
1791 get_term_internal(last_committable_idx);
1792 const auto log_up_to_date =
1793 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1794 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1795 (r.last_committable_idx >= last_committable_idx));
1796 if (!log_up_to_date)
1797 {
1799 "Recv {} to {} from {}: candidate log {}.{} is not up-to-date "
1800 "with ours {}.{}",
1801 r.msg,
1802 state->node_id,
1803 from,
1804 r.term_of_last_committable_idx,
1805 r.last_committable_idx,
1806 term_of_last_committable_idx,
1807 last_committable_idx);
1808 grant_vote = false;
1809 }
1810
1811 if (grant_vote && election_type == ElectionType::RegularVote)
1812 {
1813 // If we grant our vote to a candidate, then an election is in progress
1814 restart_election_timeout();
1815 leader_id.reset();
1816 voted_for = from;
1817 }
1818
1820 "Recv {} to {} from {}: {} vote to candidate at {}.{} with "
1821 "local state at {}.{}",
1822 r.msg,
1823 state->node_id,
1824 from,
1825 grant_vote ? "granted" : "denied",
1826 r.term_of_last_committable_idx,
1827 r.last_committable_idx,
1828 term_of_last_committable_idx,
1829 last_committable_idx);
1830
1831 send_request_vote_response(from, grant_vote, election_type);
1832 }
1833
1834 void recv_request_vote(const ccf::NodeId& from, RequestVote r)
1835 {
1836 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1837
1838#ifdef CCF_RAFT_TRACING
1839 nlohmann::json j = {};
1840 j["function"] = "recv_request_vote";
1841 j["packet"] = r;
1842 j["state"] = *state;
1843 COMMITTABLE_INDICES(j["state"], state);
1844 j["from_node_id"] = from;
1846#endif
1847
1848 recv_request_vote_unsafe(from, r, ElectionType::RegularVote);
1849 }
1850
1851 void recv_request_pre_vote(const ccf::NodeId& from, RequestPreVote r)
1852 {
1853 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1854
1855#ifdef CCF_RAFT_TRACING
1856 nlohmann::json j = {};
1857 j["function"] = "recv_request_vote";
1858 j["packet"] = r;
1859 j["state"] = *state;
1860 COMMITTABLE_INDICES(j["state"], state);
1861 j["from_node_id"] = from;
1863#endif
1864
1865 // A pre-vote is a speculative request vote, so we translate it back to a
1866 // RequestVote to avoid duplicating the logic.
1867 RequestVote rv{
1868 .term = r.term,
1869 .last_committable_idx = r.last_committable_idx,
1870 .term_of_last_committable_idx = r.term_of_last_committable_idx,
1871 };
1873 recv_request_vote_unsafe(from, rv, ElectionType::PreVote);
1874 }
1875
1876 void send_request_vote_response(
1877 const ccf::NodeId& to, bool answer, ElectionType election_type)
1878 {
1879 if (election_type == ElectionType::RegularVote)
1880 {
1881 RequestVoteResponse response{
1882 .term = state->current_view, .vote_granted = answer};
1883
1885 "Send {} from {} to {}: {}",
1886 response.msg,
1887 state->node_id,
1888 to,
1889 answer);
1890
1891 channels->send_authenticated(
1892 to, ccf::NodeMsgType::consensus_msg, response);
1893 }
1894 else
1895 {
1896 RequestPreVoteResponse response{
1897 .term = state->current_view, .vote_granted = answer};
1898
1900 "Send {} from {} to {}: {}",
1901 response.msg,
1902 state->node_id,
1903 to,
1904 answer);
1905
1906 channels->send_authenticated(
1907 to, ccf::NodeMsgType::consensus_msg, response);
1908 }
1909 }
1910
1911 void recv_request_vote_response(
1912 const ccf::NodeId& from,
1913 RequestVoteResponse r,
1914 ElectionType election_type)
1915 {
1916 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1917
1918#ifdef CCF_RAFT_TRACING
1919 nlohmann::json j = {};
1920 j["function"] = "recv_request_vote_response";
1921 j["packet"] = r;
1922 j["state"] = *state;
1923 COMMITTABLE_INDICES(j["state"], state);
1924 j["from_node_id"] = from;
1926#endif
1927
1928 // Ignore if we don't recognise the node.
1929 auto node = all_other_nodes.find(from);
1930 if (node == all_other_nodes.end())
1931 {
1933 "Recv {} to {} from {}: unknown node", r.msg, state->node_id, from);
1934 return;
1935 }
1936
1937 if (state->current_view < r.term)
1938 {
1940 "Recv {} to {} from {}: their term is more recent "
1941 "({} < {})",
1942 r.msg,
1943 state->node_id,
1944 from,
1945 state->current_view,
1946 r.term);
1947 become_aware_of_new_term(r.term);
1948 return;
1949 }
1950 if (state->current_view != r.term)
1951 {
1952 // Ignore as it is stale.
1954 "Recv request vote response to {} from {}: stale ({} != {})",
1955 state->node_id,
1956 from,
1957 state->current_view,
1958 r.term);
1959 return;
1960 }
1961
1962 if (
1963 state->leadership_state != ccf::kv::LeadershipState::PreVoteCandidate &&
1964 state->leadership_state != ccf::kv::LeadershipState::Candidate)
1965 {
1967 "Recv {} to {} from: {}: we aren't a candidate",
1968 r.msg,
1969 state->node_id,
1970 from);
1971 return;
1972 }
1973 if (
1974 election_type == ElectionType::RegularVote &&
1975 state->leadership_state != ccf::kv::LeadershipState::Candidate)
1976 {
1977 // Stale message from previous candidacy
1978 // Candidate(T) -> Follower(T) -> PreVoteCandidate(T)
1980 "Recv {} to {} from {}: no longer a candidate in {}",
1981 r.msg,
1982 state->node_id,
1983 from,
1984 r.term);
1985 return;
1986 }
1987 if (
1988 election_type == ElectionType::PreVote &&
1989 state->leadership_state != ccf::kv::LeadershipState::PreVoteCandidate)
1990 {
1991 // To receive a PreVoteResponse, we must have been a PreVoteCandidate in
1992 // that term.
1993 // Since we are a Candidate for term T, we can only have transitioned
1994 // from PreVoteCandidate for term (T-1). Since terms are monotonic this
1995 // is impossible.
1997 "Recv {} to {} from {}: unexpected message in {} when "
1998 "Candidate for {}",
1999 r.msg,
2000 state->node_id,
2001 from,
2002 r.term,
2003 state->current_view);
2004 return;
2005 }
2006
2007 if (!r.vote_granted)
2008 {
2009 // Do nothing.
2011 "Recv request vote response to {} from {}: they voted no",
2012 state->node_id,
2013 from);
2014 return;
2015 }
2016
2018 "Recv request vote response to {} from {}: they voted yes",
2019 state->node_id,
2020 from);
2021
2022 add_vote_for_me(from);
2023 }
2024
2025 void recv_request_vote_response(
2026 const ccf::NodeId& from, RequestVoteResponse r)
2027 {
2028 recv_request_vote_response(from, r, ElectionType::RegularVote);
2029 }
2030
2031 void recv_request_pre_vote_response(
2032 const ccf::NodeId& from, RequestPreVoteResponse r)
2033 {
2034 RequestVoteResponse rvr{.term = r.term, .vote_granted = r.vote_granted};
2036 recv_request_vote_response(from, rvr, ElectionType::PreVote);
2037 }
2038
2039 void recv_propose_request_vote(
2040 const ccf::NodeId& from, ProposeRequestVote r)
2041 {
2042 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
2043
2044#ifdef CCF_RAFT_TRACING
2045 nlohmann::json j = {};
2046 j["function"] = "recv_propose_request_vote";
2047 j["packet"] = r;
2048 j["state"] = *state;
2049 COMMITTABLE_INDICES(j["state"], state);
2050 j["from_node_id"] = from;
2052#endif
2053 if (!is_retired_committed() && ticking && r.term == state->current_view)
2054 {
2056 "Becoming candidate early due to propose request vote from {}", from);
2057 become_candidate();
2058 }
2059 else
2060 {
2061 RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
2062 }
2063 }
2064
2065 void restart_election_timeout()
2066 {
2067 // Randomise timeout_elapsed to get a random election timeout
2068 // between 0.5x and 1x the configured election timeout.
2069 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
2070 }
2071
2072 void reset_votes_for_me()
2073 {
2074 votes_for_me.clear();
2075 for (auto const& conf : configurations)
2076 {
2077 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
2078 votes_for_me[conf.idx].votes.clear();
2079 }
2080 }
2081
2082 void become_pre_vote_candidate()
2083 {
2084 if (configurations.empty())
2085 {
2087 "Not becoming pre-vote candidate {} due to lack of a configuration.",
2088 state->node_id);
2089 return;
2090 }
2091
2092 state->leadership_state = ccf::kv::LeadershipState::PreVoteCandidate;
2093 leader_id.reset();
2094
2095 reset_votes_for_me();
2096 restart_election_timeout();
2097
2099 "Becoming pre-vote candidate {}: {}",
2100 state->node_id,
2101 state->current_view);
2102
2103#ifdef CCF_RAFT_TRACING
2104 nlohmann::json j = {};
2105 j["function"] = "become_pre_vote_candidate";
2106 j["state"] = *state;
2107 COMMITTABLE_INDICES(j["state"], state);
2108 j["configurations"] = configurations;
2110#endif
2111
2112 add_vote_for_me(state->node_id);
2113
2114 // Request votes only go to nodes in configurations, since only
2115 // their votes can be tallied towards an election quorum.
2116 for (auto const& node_id : other_nodes_in_active_configs())
2117 {
2118 // ccfraft!RequestVote
2119 send_request_pre_vote(node_id);
2120 }
2121 }
2122
2123 // ccfraft!Timeout
2124 void become_candidate()
2125 {
2126 if (configurations.empty())
2127 {
2128 // ccfraft!Timeout:
2129 // /\ \E c \in DOMAIN configurations[i] :
2130 // /\ i \in configurations[i][c]
2132 "Not becoming candidate {} due to lack of a configuration.",
2133 state->node_id);
2134 return;
2135 }
2136
2137 state->leadership_state = ccf::kv::LeadershipState::Candidate;
2138 leader_id.reset();
2139
2140 voted_for = state->node_id;
2141 reset_votes_for_me();
2142 state->current_view++;
2143
2144 restart_election_timeout();
2145 reset_last_ack_timeouts();
2146
2148 "Becoming candidate {}: {}", state->node_id, state->current_view);
2149
2150#ifdef CCF_RAFT_TRACING
2151 nlohmann::json j = {};
2152 j["function"] = "become_candidate";
2153 j["state"] = *state;
2154 COMMITTABLE_INDICES(j["state"], state);
2155 j["configurations"] = configurations;
2157#endif
2158
2159 add_vote_for_me(state->node_id);
2160
2161 // Request votes only go to nodes in configurations, since only
2162 // their votes can be tallied towards an election quorum.
2163 for (auto const& node_id : other_nodes_in_active_configs())
2164 {
2165 // ccfraft!RequestVote
2166 send_request_vote(node_id);
2167 }
2168 }
2169
2170 void become_leader(bool /*force_become_leader*/ = false)
2171 {
2172 if (is_retired_committed())
2173 {
2174 return;
2175 }
2176
2177 const auto election_index = last_committable_index();
2178
2180 "Election index is {} in term {}", election_index, state->current_view);
2181 // Discard any un-committable updates we may hold,
2182 // since we have no signature for them. Except at startup,
2183 // where we do not want to roll back the genesis transaction.
2184 if (state->commit_idx > 0)
2185 {
2186 rollback(election_index);
2187 }
2188 else
2189 {
2190 // but we still want the KV to know which term we're in
2191 store->initialise_term(state->current_view);
2192 }
2193
2194 state->leadership_state = ccf::kv::LeadershipState::Leader;
2195 leader_id = state->node_id;
2196 should_sign = true;
2197
2198 using namespace std::chrono_literals;
2199 timeout_elapsed = 0ms;
2200
2201 reset_last_ack_timeouts();
2202
2204 "Becoming leader {}: {}", state->node_id, state->current_view);
2205
2206#ifdef CCF_RAFT_TRACING
2207 nlohmann::json j = {};
2208 j["function"] = "become_leader";
2209 j["state"] = *state;
2210 COMMITTABLE_INDICES(j["state"], state);
2211 j["configurations"] = configurations;
2213#endif
2214
2215 // Try to advance commit at once if there are no other nodes.
2216 if (other_nodes_in_active_configs().size() == 0)
2217 {
2218 update_commit();
2219 }
2220
2221 // Reset next, match, and sent indices for all nodes.
2222 auto next = state->last_idx + 1;
2223
2224 for (auto& node : all_other_nodes)
2225 {
2226 node.second.match_idx = 0;
2227 node.second.sent_idx = next - 1;
2228
2229 // Send an empty append_entries to all nodes.
2230 send_append_entries(node.first, next);
2231 }
2232
2233 if (retired_node_cleanup)
2234 {
2235 retired_node_cleanup->cleanup();
2236 }
2237 }
2238
2239 public:
2240 // Called when a replica becomes follower in the same term, e.g. when the
2241 // primary node has not received a majority of acks (CheckQuorum)
2243 {
2244 leader_id.reset();
2245 restart_election_timeout();
2246 reset_last_ack_timeouts();
2247
2248 // Drop anything unsigned here, but retain all signed entries. Only do a
2249 // more aggressive rollback, potentially including signatures, when
2250 // receiving a conflicting AppendEntries
2251 rollback(last_committable_index());
2252
2253 state->leadership_state = ccf::kv::LeadershipState::Follower;
2255 "Becoming follower {}: {}.{}",
2256 state->node_id,
2257 state->current_view,
2258 state->commit_idx);
2259
2260#ifdef CCF_RAFT_TRACING
2261 nlohmann::json j = {};
2262 j["function"] = "become_follower";
2263 j["state"] = *state;
2264 COMMITTABLE_INDICES(j["state"], state);
2265 j["configurations"] = configurations;
2267#endif
2268 }
2269
2270 // Called when a replica becomes aware of the existence of a new term
2271 // If retired already, state remains unchanged, but the replica otherwise
2272 // becomes a follower in the new term.
2274 {
2275 RAFT_DEBUG_FMT("Becoming aware of new term {}", term);
2276
2277 state->current_view = term;
2278 voted_for.reset();
2279 reset_votes_for_me();
2280 become_follower();
2281 is_new_follower = true;
2282 }
2283
2284 private:
2285 void send_propose_request_vote()
2286 {
2287 ProposeRequestVote prv{.term = state->current_view};
2288
2289 std::optional<ccf::NodeId> successor = std::nullopt;
2290 Index max_match_idx = 0;
2291 ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;
2292
2293 // Pick the node that has the highest match_idx, and break
2294 // ties by looking at the highest reconfiguration id they are
2295 // part of. This can lead to nudging a node that is
2296 // about to retire too, but that node will then nudge
2297 // a successor, and that seems preferable to nudging a node that
2298 // risks not being eligible if reconfiguration id is prioritised.
2299 // Alternatively, we could pick the node with the highest match idx
2300 // in the latest config, provided that match idx at least as high as a
2301 // majority. That would make them both eligible and unlikely to retire
2302 // soon.
2303 for (auto& [node, node_state] : all_other_nodes)
2304 {
2305 if (node_state.match_idx >= max_match_idx)
2306 {
2307 ccf::kv::ReconfigurationId latest_reconf_id = 0;
2308 auto conf = configurations.rbegin();
2309 while (conf != configurations.rend())
2310 {
2311 if (conf->nodes.find(node) != conf->nodes.end())
2312 {
2313 latest_reconf_id = conf->idx;
2314 break;
2315 }
2316 conf++;
2317 }
2318 if (!(node_state.match_idx == max_match_idx &&
2319 latest_reconf_id < reconf_id_of_max_match))
2320 {
2321 reconf_id_of_max_match = latest_reconf_id;
2322 successor = node;
2323 max_match_idx = node_state.match_idx;
2324 }
2325 }
2326 }
2327 if (successor.has_value())
2328 {
2329 RAFT_INFO_FMT("Proposing that {} becomes candidate", successor.value());
2330 channels->send_authenticated(
2331 successor.value(), ccf::NodeMsgType::consensus_msg, prv);
2332 }
2333 }
2334 void become_retired(Index idx, ccf::kv::RetirementPhase phase)
2335 {
2337 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2338 phase,
2339 state->leadership_state,
2340 state->node_id,
2341 state->current_view,
2342 idx);
2343
2345 {
2347 !state->retirement_idx.has_value(),
2348 "retirement_idx already set to {}",
2349 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2350 state->retirement_idx.value());
2351 state->retirement_idx = idx;
2352 RAFT_INFO_FMT("Node retiring at {}", idx);
2353 }
2354 else if (phase == ccf::kv::RetirementPhase::Signed)
2355 {
2356 assert(state->retirement_idx.has_value());
2358 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2359 idx >= state->retirement_idx.value(),
2360 "Index {} unexpectedly lower than retirement_idx {}",
2361 idx,
2362 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2363 state->retirement_idx.value());
2364 state->retirement_committable_idx = idx;
2365 RAFT_INFO_FMT("Node retirement committable at {}", idx);
2366 }
2368 {
2369 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2370 {
2371 send_propose_request_vote();
2372 }
2373
2374 leader_id.reset();
2375 state->leadership_state = ccf::kv::LeadershipState::None;
2376 }
2377
2378 state->membership_state = ccf::kv::MembershipState::Retired;
2379 state->retirement_phase = phase;
2380 }
2381
2382 void add_vote_for_me(const ccf::NodeId& from)
2383 {
2384 if (configurations.empty())
2385 {
2387 "Not voting for myself {} due to lack of a configuration.",
2388 state->node_id);
2389 return;
2390 }
2391
2392 // Add vote for from node in each configuration where it is present
2393 for (auto const& conf : configurations)
2394 {
2395 auto const& nodes = conf.nodes;
2396 if (nodes.find(from) == nodes.end())
2397 {
2398 // from node is no longer in any active configuration.
2399 continue;
2400 }
2401
2402 votes_for_me[conf.idx].votes.insert(from);
2404 "Node {} voted for {} in configuration {} with quorum {}",
2405 from,
2406 state->node_id,
2407 conf.idx,
2408 votes_for_me[conf.idx].quorum);
2409 }
2410
2411 // We need a quorum of votes in _all_ configurations
2412 bool is_elected = true;
2413 for (auto const& v : votes_for_me)
2414 {
2415 auto const& quorum = v.second.quorum;
2416 auto const& votes = v.second.votes;
2417
2418 if (votes.size() < quorum)
2419 {
2420 is_elected = false;
2421 break;
2422 }
2423 }
2424
2425 if (is_elected)
2426 {
2427 switch (state->leadership_state)
2428 {
2430 become_candidate();
2431 break;
2433 become_leader();
2434 break;
2435 default:
2436 throw std::logic_error(
2437 "add_vote_for_me() called while not a pre-vote candidate or "
2438 "candidate");
2439 }
2440 }
2441 }
2442
2443 // If there exists some committable idx in the current term such that idx >
2444 // commit_idx and a majority of nodes have replicated it, commit to that
2445 // idx.
2446 void update_commit()
2447 {
2448 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2449 {
2450 throw std::logic_error(
2451 "update_commit() must only be called while this node is leader");
2452 }
2453
2454 std::optional<Index> new_agreement_index = std::nullopt;
2455
2456 // Obtain CFT watermarks
2457 for (auto const& c : configurations)
2458 {
2459 // The majority must be checked separately for each active
2460 // configuration.
2461 std::vector<Index> match;
2462 match.reserve(c.nodes.size());
2463
2464 for (const auto& node : c.nodes)
2465 {
2466 if (node.first == state->node_id)
2467 {
2468 match.push_back(state->last_idx);
2469 }
2470 else
2471 {
2472 match.push_back(all_other_nodes.at(node.first).match_idx);
2473 }
2474 }
2475
2476 sort(match.begin(), match.end());
2477 auto confirmed = match.at((match.size() - 1) / 2);
2478
2479 if (
2480 !new_agreement_index.has_value() ||
2481 confirmed < new_agreement_index.value())
2482 {
2483 new_agreement_index = confirmed;
2484 }
2485 }
2486
2487 if (new_agreement_index.has_value())
2488 {
2489 if (new_agreement_index.value() > state->last_idx)
2490 {
2491 throw std::logic_error(
2492 "Followers appear to have later match indices than leader");
2493 }
2494
2495 const auto new_commit_idx =
2496 find_highest_possible_committable_index(new_agreement_index.value());
2497
2498 if (new_commit_idx.has_value())
2499 {
2501 "In update_commit, new_commit_idx: {}, "
2502 "last_idx: {}",
2503 new_commit_idx.value(),
2504 state->last_idx);
2505
2506 const auto term_of_new = get_term_internal(new_commit_idx.value());
2507 if (term_of_new == state->current_view)
2508 {
2509 commit(new_commit_idx.value());
2510 }
2511 else
2512 {
2514 "Ack quorum at {} resulted in proposed commit index {}, which "
2515 "is in term {}. Waiting for agreement on committable entry in "
2516 "current term {} to update commit",
2517 new_agreement_index.value(),
2518 new_commit_idx.value(),
2519 term_of_new,
2520 state->current_view);
2521 }
2522 }
2523 }
2524 }
2525
2526 // Commits at the highest committable index which is not greater than the
2527 // given idx.
2528 void commit_if_possible(Index idx)
2529 {
2531 "Commit if possible {} (ci: {}) (ti {})",
2532 idx,
2533 state->commit_idx,
2534 get_term_internal(idx));
2535 if (
2536 (idx > state->commit_idx) &&
2537 (get_term_internal(idx) <= state->current_view))
2538 {
2539 const auto highest_committable =
2540 find_highest_possible_committable_index(idx);
2541 if (highest_committable.has_value())
2542 {
2543 commit(highest_committable.value());
2544 }
2545 }
2546 }
2547
2548 size_t get_quorum(size_t n) const
2549 {
2550 return (n / 2) + 1;
2551 }
2552
2553 void commit(Index idx)
2554 {
2555 if (idx > state->last_idx)
2556 {
2557 throw std::logic_error(fmt::format(
2558 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2559 }
2560
2561 RAFT_DEBUG_FMT("Starting commit");
2562
2563 // This could happen if a follower becomes the leader when it
2564 // has committed fewer log entries, although it has them available.
2565 if (idx <= state->commit_idx)
2566 {
2567 return;
2568 }
2569
2570#ifdef CCF_RAFT_TRACING
2571 nlohmann::json j = {};
2572 j["function"] = "commit";
2573 j["args"] = nlohmann::json::object();
2574 j["args"]["idx"] = idx;
2575 j["state"] = *state;
2576 COMMITTABLE_INDICES(j["state"], state);
2577 j["configurations"] = configurations;
2579#endif
2580
2581 compact_committable_indices(idx);
2582
2583 state->commit_idx = idx;
2584 if (
2585 is_retired() &&
2586 state->retirement_phase == ccf::kv::RetirementPhase::Signed &&
2587 state->retirement_committable_idx.has_value())
2588 {
2589 const auto retirement_committable =
2590 state // NOLINT(bugprone-unchecked-optional-access)
2591 ->retirement_committable_idx.value();
2592 if (idx >= retirement_committable)
2593 {
2594 become_retired(idx, ccf::kv::RetirementPhase::Completed);
2595 }
2596 }
2597
2598 RAFT_DEBUG_FMT("Compacting...");
2599 store->compact(idx);
2600 ledger->commit(idx);
2601
2602 RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx);
2603
2604 // Examine each configuration that is followed by a globally committed
2605 // configuration.
2606 bool changed = false;
2607
2608 while (true)
2609 {
2610 auto conf = configurations.begin();
2611 if (conf == configurations.end())
2612 {
2613 break;
2614 }
2615
2616 auto next = std::next(conf);
2617 if (next == configurations.end())
2618 {
2619 break;
2620 }
2621
2622 if (idx < next->idx)
2623 {
2624 break;
2625 }
2626
2628 "Configurations: discard committed configuration at {}", conf->idx);
2629 configurations.pop_front();
2630 changed = true;
2631 }
2632
2633 if (changed)
2634 {
2635 create_and_remove_node_state();
2636 if (retired_node_cleanup && is_primary())
2637 {
2638 retired_node_cleanup->cleanup();
2639 }
2640 }
2641 }
2642
2643 bool is_self_in_latest_config()
2644 {
2645 bool present = false;
2646 if (!configurations.empty())
2647 {
2648 auto current_nodes = configurations.back().nodes;
2649 present = current_nodes.find(state->node_id) != current_nodes.end();
2650 }
2651 return present;
2652 }
2653
2654 void start_ticking_if_necessary()
2655 {
2656 if (!ticking && is_self_in_latest_config())
2657 {
2658 start_ticking();
2659 }
2660 }
2661
2662 public:
2663 void rollback(Index idx)
2664 {
2665 if (idx < state->commit_idx)
2666 {
2668 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2669 "ignoring rollback request",
2670 idx,
2671 state->commit_idx);
2672 return;
2673 }
2674
2675 store->rollback({get_term_internal(idx), idx}, state->current_view);
2676
2677 RAFT_DEBUG_FMT("Setting term in store to: {}", state->current_view);
2678 ledger->truncate(idx);
2679 state->last_idx = idx;
2680 RAFT_DEBUG_FMT("Rolled back at {}", idx);
2681
2682 state->view_history.rollback(idx);
2683
2684 while (!state->committable_indices.empty() &&
2685 (state->committable_indices.back() > idx))
2686 {
2687 state->committable_indices.pop_back();
2688 }
2689
2690 if (
2691 state->membership_state == ccf::kv::MembershipState::Retired &&
2692 state->retirement_phase == ccf::kv::RetirementPhase::Signed)
2693 {
2694 assert(state->retirement_committable_idx.has_value());
2695 if (state->retirement_committable_idx.has_value())
2696 {
2697 const auto retirement_committable =
2698 state // NOLINT(bugprone-unchecked-optional-access)
2699 ->retirement_committable_idx.value();
2700 if (retirement_committable > idx)
2701 {
2702 state->retirement_committable_idx = std::nullopt;
2703 state->retirement_phase = ccf::kv::RetirementPhase::Ordered;
2704 }
2705 }
2706 }
2707
2708 if (
2709 state->membership_state == ccf::kv::MembershipState::Retired &&
2710 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
2711 {
2712 assert(state->retirement_idx.has_value());
2713 if (state->retirement_idx.has_value())
2714 {
2715 const auto retirement =
2716 state // NOLINT(bugprone-unchecked-optional-access)
2717 ->retirement_idx.value();
2718 if (retirement > idx)
2719 {
2720 state->retirement_idx = std::nullopt;
2721 state->retirement_phase = std::nullopt;
2722 state->membership_state = ccf::kv::MembershipState::Active;
2723 RAFT_DEBUG_FMT("Becoming Active after rollback");
2724 }
2725 }
2726 }
2727
2728 // Rollback configurations.
2729 bool changed = false;
2730
2731 while (!configurations.empty() && (configurations.back().idx > idx))
2732 {
2734 "Configurations: rollback configuration at {}",
2735 configurations.back().idx);
2736 configurations.pop_back();
2737 changed = true;
2738 }
2739
2740 if (changed)
2741 {
2742 create_and_remove_node_state();
2743 }
2744 }
2745
2746 nlohmann::json get_state_representation() const
2747 {
2748 return *state;
2749 }
2750
2751 void nominate_successor() override
2752 {
2753 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2754 {
2756 "Not proposing request vote from {} since not leader",
2757 state->node_id);
2758 return;
2759 }
2760
2761 LOG_INFO_FMT("Nominating successor for {}", state->node_id);
2762
2763#ifdef CCF_RAFT_TRACING
2764 nlohmann::json j = {};
2765 j["function"] = "step_down_and_nominate_successor";
2766 j["state"] = *state;
2767 COMMITTABLE_INDICES(j["state"], state);
2768 j["configurations"] = configurations;
2770#endif
2771
2772 send_propose_request_vote();
2773 }
2774
2775 private:
2776 void create_and_remove_node_state()
2777 {
2778 // Find all nodes present in any active configuration.
2779 Configuration::Nodes active_nodes;
2780
2781 for (auto const& conf : configurations)
2782 {
2783 for (auto const& node : conf.nodes)
2784 {
2785 active_nodes.emplace(node.first, node.second);
2786 }
2787 }
2788
2789 // Remove all nodes in the node state that are not present in any active
2790 // configuration.
2791 std::vector<ccf::NodeId> to_remove;
2792
2793 for (const auto& node : all_other_nodes)
2794 {
2795 if (active_nodes.find(node.first) == active_nodes.end())
2796 {
2797 to_remove.push_back(node.first);
2798 }
2799 }
2800
2801 // Add all active nodes that are not already present in the node state.
2802 for (auto node_info : active_nodes)
2803 {
2804 if (node_info.first == state->node_id)
2805 {
2806 continue;
2807 }
2808
2809 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2810 {
2811 if (!channels->have_channel(node_info.first))
2812 {
2814 "Configurations: create node channel with {}", node_info.first);
2815
2816 channels->associate_node_address(
2817 node_info.first,
2818 node_info.second.hostname,
2819 node_info.second.port);
2820 }
2821
2822 // A new node is sent only future entries initially. If it does not
2823 // have prior data, it will communicate that back to the leader.
2824 auto index = state->last_idx + 1;
2825 all_other_nodes.try_emplace(
2826 node_info.first, node_info.second, index, 0);
2827
2828 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2829 {
2830 send_append_entries(node_info.first, index);
2831 }
2832
2834 "Added raft node {} ({}:{})",
2835 node_info.first,
2836 node_info.second.hostname,
2837 node_info.second.port);
2838 }
2839 }
2840 }
2841 };
2842}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition raft.h:98
Term get_view() override
Definition raft.h:467
ccf::NodeId id() override
Definition raft.h:249
std::shared_ptr< ccf::NodeToNode > channels
Definition raft.h:209
bool is_retired_completed() const
Definition raft.h:321
std::set< ccf::NodeId > other_nodes_in_active_configs() const
Definition raft.h:502
std::vector< Index > get_view_history_since(Index idx) override
Definition raft.h:492
bool can_replicate() override
Definition raft.h:264
bool replicate(const ccf::kv::BatchVector &entries, Term term) override
Definition raft.h:618
void add_configuration(Index idx, const ccf::kv::Configuration::Nodes &conf) override
Definition raft.h:520
bool is_backup() override
Definition raft.h:300
nlohmann::json get_state_representation() const
Definition raft.h:2746
void become_follower()
Definition raft.h:2242
void compact_committable_indices(Index idx)
Definition raft.h:380
bool is_active() const
Definition raft.h:305
void enable_all_domains() override
Definition raft.h:389
std::optional< Index > find_highest_possible_committable_index(Index idx) const
Definition raft.h:364
void start_ticking()
Definition raft.h:559
std::vector< Index > get_view_history(Index idx) override
Definition raft.h:486
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size) override
Definition raft.h:732
void nominate_successor() override
Definition raft.h:2751
void init_as_backup(Index index, Term term, const std::vector< Index > &term_history, Index recovery_start_index=0) override
Definition raft.h:436
Aft(const ccf::consensus::Configuration &settings_, std::unique_ptr< Store > store_, std::unique_ptr< LedgerProxy > ledger_, std::shared_ptr< ccf::NodeToNode > channels_, std::shared_ptr< aft::State > state_, std::shared_ptr< ccf::NodeClient > rpc_request_context_, bool public_only_=false)
Definition raft.h:211
Index last_committable_index() const
Definition raft.h:355
void rollback(Index idx)
Definition raft.h:2663
bool is_candidate() override
Definition raft.h:259
bool is_primary() override
Definition raft.h:254
~Aft() override=default
std::pair< Term, Index > get_committed_txid() override
Definition raft.h:473
void become_aware_of_new_term(Term term)
Definition raft.h:2273
ccf::kv::ConsensusDetails get_details() override
Definition raft.h:592
Configuration::Nodes get_latest_configuration() override
Definition raft.h:586
bool is_retired_committed() const
Definition raft.h:315
Term get_view(Index idx) override
Definition raft.h:480
void force_become_primary() override
Definition raft.h:397
bool is_at_max_capacity() override
Definition raft.h:275
static constexpr size_t append_entries_size_limit
Definition raft.h:207
Consensus::SignatureDisposition get_signature_disposition() override
Definition raft.h:286
Index get_committed_seqno() override
Definition raft.h:461
bool is_retired() const
Definition raft.h:310
void force_become_primary(Index index, Term term, const std::vector< Index > &terms, Index commit_idx_) override
Definition raft.h:412
Index get_last_idx()
Definition raft.h:456
void set_retired_committed(ccf::SeqNo seqno, const std::vector< ccf::kv::NodeId > &node_ids) override
Definition raft.h:327
void periodic(std::chrono::milliseconds elapsed) override
Definition raft.h:827
void reset_last_ack_timeouts()
Definition raft.h:567
std::unique_ptr< LedgerProxy > ledger
Definition raft.h:208
std::optional< ccf::NodeId > primary() override
Definition raft.h:244
Configuration::Nodes get_latest_configuration_unsafe() const override
Definition raft.h:576
static constexpr ccf::View InvalidView
Definition state.h:24
Definition node_to_node.h:22
NodeId from
Definition node_to_node.h:24
Definition kv_types.h:364
Definition serialized.h:18
const char * what() const noexcept override
Definition serialized.h:25
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition state.h:17
ElectionType
Definition raft_types.h:185
@ PreVote
Definition raft_types.h:186
@ RegularVote
Definition raft_types.h:187
AppendEntriesResponseType
Definition raft_types.h:154
uint64_t Term
Definition raft_types.h:20
@ raft_append_entries_response
Definition raft_types.h:93
@ raft_request_pre_vote_response
Definition raft_types.h:99
@ raft_request_vote
Definition raft_types.h:95
@ raft_request_pre_vote
Definition raft_types.h:98
@ raft_request_vote_response
Definition raft_types.h:96
@ raft_append_entries
Definition raft_types.h:92
@ raft_propose_request_vote
Definition raft_types.h:97
uint64_t Index
Definition raft_types.h:19
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:206
uint64_t Version
Definition version.h:8
RetirementPhase
Definition kv_types.h:133
uint64_t ReconfigurationId
Definition kv_types.h:46
ApplyResult
Definition kv_types.h:302
@ PASS_SIGNATURE
Definition kv_types.h:304
@ FAIL
Definition kv_types.h:311
@ PASS
Definition kv_types.h:303
@ PASS_ENCRYPTED_PAST_LEDGER_SECRET
Definition kv_types.h:309
Definition app_interface.h:14
@ ONE_TRANSACTION
Definition reconfiguration_type.h:11
constexpr View VIEW_UNKNOWN
Definition tx_id.h:26
@ consensus_msg
Definition node_types.h:23
uint64_t SeqNo
Definition tx_id.h:36
Definition dl_list.h:9
STL namespace.
#define LOG_ROLLBACK_INFO_FMT
Definition raft.h:90
#define RAFT_TRACE_FMT
Definition raft.h:59
#define RAFT_TRACE_JSON_OUT(json_object)
Definition raft.h:65
#define RAFT_FAIL_FMT
Definition raft.h:62
#define RAFT_INFO_FMT
Definition raft.h:61
#define RAFT_DEBUG_FMT
Definition raft.h:60
Definition raft_types.h:166
Definition raft_types.h:128
Definition raft_types.h:244
Term term
Definition raft_types.h:247
Definition raft_types.h:233
Definition raft_types.h:209
Definition raft_types.h:222
Definition raft_types.h:197
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition consensus_config.h:11
Definition kv_types.h:51
Definition kv_types.h:49
std::map< NodeId, NodeInfo > Nodes
Definition kv_types.h:68
Nodes nodes
Definition kv_types.h:71
Definition kv_types.h:151
bool ticking
Definition kv_types.h:168
std::optional< ccf::ReconfigurationType > reconfiguration_type
Definition kv_types.h:165
std::optional< RetirementPhase > retirement_phase
Definition kv_types.h:162
ccf::View current_view
Definition kv_types.h:167
std::optional< LeadershipState > leadership_state
Definition kv_types.h:161
MembershipState membership_state
Definition kv_types.h:160
std::unordered_map< ccf::NodeId, Ack > acks
Definition kv_types.h:159
std::optional< ccf::NodeId > primary_id
Definition kv_types.h:166
std::vector< Configuration > configs
Definition kv_types.h:158