CCF
Loading...
Searching...
No Matches
raft.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/ds/logger.h"
7#include "ccf/pal/locking.h"
9#include "ccf/tx_id.h"
10#include "ccf/tx_status.h"
11#include "ds/serialized.h"
12#include "impl/state.h"
13#include "kv/kv_types.h"
14#include "node/node_client.h"
15#include "node/node_to_node.h"
16#include "node/node_types.h"
18#include "raft_types.h"
20
21#include <algorithm>
22#include <list>
23#include <random>
24#include <unordered_map>
25#include <vector>
26
27#ifdef VERBOSE_RAFT_LOGGING
28# define RAFT_TRACE_FMT(s, ...) \
29 CCF_LOG_FMT(TRACE, "raft") \
30 ("{} | {} | {} | " s, \
31 state->node_id, \
32 state->leadership_state, \
33 state->membership_state, \
34 ##__VA_ARGS__)
35# define RAFT_DEBUG_FMT(s, ...) \
36 CCF_LOG_FMT(DEBUG, "raft") \
37 ("{} | {} | {} | " s, \
38 state->node_id, \
39 state->leadership_state, \
40 state->membership_state, \
41 ##__VA_ARGS__)
42# define RAFT_INFO_FMT(s, ...) \
43 CCF_LOG_FMT(INFO, "raft") \
44 ("{} | {} | {} | " s, \
45 state->node_id, \
46 state->leadership_state, \
47 state->membership_state, \
48 ##__VA_ARGS__)
49# define RAFT_FAIL_FMT(s, ...) \
50 CCF_LOG_FMT(FAIL, "raft") \
51 ("{} | {} | {} | " s, \
52 state->node_id, \
53 state->leadership_state, \
54 state->membership_state, \
55 ##__VA_ARGS__)
56#else
57# define RAFT_TRACE_FMT LOG_TRACE_FMT
58# define RAFT_DEBUG_FMT LOG_DEBUG_FMT
59# define RAFT_INFO_FMT LOG_INFO_FMT
60# define RAFT_FAIL_FMT LOG_FAIL_FMT
61#endif
62
63#define RAFT_TRACE_JSON_OUT(json_object) \
64 CCF_LOG_OUT(DEBUG, "raft_trace") << json_object
65
66#ifdef CCF_RAFT_TRACING
67
68static inline void add_committable_indices_start_and_end(
69 nlohmann::json& j, const std::shared_ptr<aft::State>& state)
70{
71 std::vector<aft::Index> committable_indices;
72 if (!state->committable_indices.empty())
73 {
74 committable_indices.push_back(state->committable_indices.front());
75 if (state->committable_indices.size() > 1)
76 {
77 committable_indices.push_back(state->committable_indices.back());
78 }
79 }
80 j["committable_indices"] = committable_indices;
81}
82
83# define COMMITTABLE_INDICES(event_state, state) \
84 add_committable_indices_start_and_end(event_state, state);
85
86#endif
87
88#define LOG_ROLLBACK_INFO_FMT CCF_LOG_FMT(INFO, "rollback")
89
90namespace aft
91{
93
94 template <class LedgerProxy>
95 class Aft : public ccf::kv::Consensus
96 {
97 private:
98 struct NodeState
99 {
100 Configuration::NodeInfo node_info;
101
102 // the highest index sent to the node
103 Index sent_idx;
104
105 // the highest matching index with the node that was confirmed
106 Index match_idx;
107
108 // timeout tracking the last time an ack was received from the node
109 std::chrono::milliseconds last_ack_timeout;
110
111 NodeState() = default;
112
113 NodeState(
114 const Configuration::NodeInfo& node_info_,
115 Index sent_idx_,
116 Index match_idx_ = 0) :
117 node_info(node_info_),
118 sent_idx(sent_idx_),
119 match_idx(match_idx_),
120 last_ack_timeout(0)
121 {}
122 };
123
124 // Persistent
125 std::unique_ptr<Store> store;
126
127 // Volatile
128 std::optional<ccf::NodeId> voted_for = std::nullopt;
129 std::optional<ccf::NodeId> leader_id = std::nullopt;
130
131 // Keep track of votes in each active configuration
132 struct Votes
133 {
134 std::unordered_set<ccf::NodeId> votes;
135 size_t quorum;
136 };
137 std::map<Index, Votes> votes_for_me;
138
139 std::chrono::milliseconds timeout_elapsed;
140
141 // When this node receives append entries from a new primary, it may need to
142 // roll back a committable but uncommitted suffix it holds. The
143 // new primary dictates the index where this suffix begins, which
144 // following the Raft election rules must be at least as high as the highest
145 // commit index reported by the previous primary. The window in which this
146 // rollback could be accepted is minimised to avoid unnecessary
147 // retransmissions - this node only executes this rollback instruction on
148 // the first append entries after it became a follower. As with any append
149 // entries, the initial index will not advance until this node acks.
150 bool is_new_follower = false;
151
152 // When this node becomes primary, they should produce a new signature in
153 // the current view. This signature is the first thing they may commit, as
154 // they cannot confirm commit of anything from a previous view (Raft paper
155 // section 5.4.2). This bool is true from the point this node becomes
156 // primary, until it sees a committable entry
157 bool should_sign = false;
158
159 std::shared_ptr<aft::State> state;
160
161 // Timeouts
162 std::chrono::milliseconds request_timeout;
163 std::chrono::milliseconds election_timeout;
164 size_t max_uncommitted_tx_count;
165 bool ticking = false;
166
167 // Configurations
168 std::list<Configuration> configurations;
169 // Union of other nodes (i.e. all nodes but us) in each active
170 // configuration, plus those that are retired, but for which
171 // the persistence of retirement knowledge is not yet established,
172 // i.e. Completed but not RetiredCommitted
173 // This should be used for diagnostic or broadcasting
174 // messages but _not_ for counting quorums, which should be done for each
175 // active configuration.
176 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
177 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
178 ccf::ReconfigurationType reconfiguration_type;
179
180 // Node client to trigger submission of RPC requests
181 std::shared_ptr<ccf::NodeClient> node_client;
182
183 // Used to remove retired nodes from store
184 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
185
186 size_t entry_size_not_limited = 0;
187 size_t entry_count = 0;
188 Index entries_batch_size = 20;
189 static constexpr int batch_window_size = 100;
190 int batch_window_sum = 0;
191
192 // When this is set, only public domain is deserialised when receiving
193 // append entries
194 bool public_only = false;
195
196 // Randomness
197 std::uniform_int_distribution<int> distrib;
198 std::default_random_engine rand;
199
200 // AppendEntries messages are currently constrained to only contain entries
201 // from a single term, so that the receiver can know the term of each entry
202 // pre-deserialisation, without an additional header.
203 static constexpr size_t max_terms_per_append_entries = 1;
204
205 public:
206 static constexpr size_t append_entries_size_limit = 20000;
207 std::unique_ptr<LedgerProxy> ledger;
208 std::shared_ptr<ccf::NodeToNode> channels;
209
210 public:
212 const ccf::consensus::Configuration& settings_,
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ = false,
219 ccf::kv::MembershipState initial_membership_state_ =
221 ccf::ReconfigurationType reconfiguration_type_ =
223 store(std::move(store_)),
224
225 timeout_elapsed(0),
226
227 state(state_),
228
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
232
233 reconfiguration_type(reconfiguration_type_),
234 node_client(rpc_request_context_),
235 retired_node_cleanup(
236 std::make_unique<ccf::RetiredNodeCleanup>(node_client)),
237
238 public_only(public_only_),
239
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
242
243 ledger(std::move(ledger_)),
244 channels(channels_)
245 {}
246
247 virtual ~Aft() = default;
248
249 std::optional<ccf::NodeId> primary() override
250 {
251 return leader_id;
252 }
253
254 ccf::NodeId id() override
255 {
256 return state->node_id;
257 }
258
259 bool is_primary() override
260 {
261 return state->leadership_state == ccf::kv::LeadershipState::Leader;
262 }
263
264 bool is_candidate() override
265 {
266 return state->leadership_state == ccf::kv::LeadershipState::Candidate;
267 }
268
269 bool can_replicate() override
270 {
271 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
272 return can_replicate_unsafe();
273 }
274
280 bool is_at_max_capacity() override
281 {
282 if (max_uncommitted_tx_count == 0)
283 {
284 return false;
285 }
286 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
287 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
288 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
289 }
290
291 Consensus::SignatureDisposition get_signature_disposition() override
292 {
293 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
294 if (can_sign_unsafe())
295 {
296 if (should_sign)
297 {
298 return Consensus::SignatureDisposition::SHOULD_SIGN;
299 }
300 else
301 {
302 return Consensus::SignatureDisposition::CAN_SIGN;
303 }
304 }
305 else
306 {
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
308 }
309 }
310
311 bool is_backup() override
312 {
313 return state->leadership_state == ccf::kv::LeadershipState::Follower;
314 }
315
316 bool is_active() const
317 {
318 return state->membership_state == ccf::kv::MembershipState::Active;
319 }
320
321 bool is_retired() const
322 {
323 return state->membership_state == ccf::kv::MembershipState::Retired;
324 }
325
327 {
328 return state->membership_state == ccf::kv::MembershipState::Retired &&
329 state->retirement_phase == ccf::kv::RetirementPhase::RetiredCommitted;
330 }
331
333 {
334 return state->membership_state == ccf::kv::MembershipState::Retired &&
335 state->retirement_phase == ccf::kv::RetirementPhase::Completed;
336 }
337
339 ccf::SeqNo seqno, const std::vector<ccf::kv::NodeId>& node_ids) override
340 {
341 for (auto& node_id : node_ids)
342 {
343 if (id() == node_id)
344 {
346 state->membership_state == ccf::kv::MembershipState::Retired,
347 "Node is not retired, cannot become retired committed");
349 state->retirement_phase == ccf::kv::RetirementPhase::Completed,
350 "Node is not retired, cannot become retired committed");
351 state->retired_committed_idx = seqno;
352 become_retired(seqno, ccf::kv::RetirementPhase::RetiredCommitted);
353 }
354 else
355 {
356 // Once a node's retired_committed status is itself committed, all
357 // future primaries in the network must be aware its retirement is
358 // committed, and so no longer need any communication with it to
359 // advance commit. No further communication with this node is needed.
360 all_other_nodes.erase(node_id);
361 RAFT_INFO_FMT("Removed {} from nodes known to consensus", node_id);
362 }
363 }
364 }
365
367 {
368 return state->committable_indices.empty() ?
369 state->commit_idx :
370 state->committable_indices.back();
371 }
372
373 // Returns the highest committable index which is not greater than the
374 // given idx.
376 Index idx) const
377 {
378 const auto it = std::upper_bound(
379 state->committable_indices.rbegin(),
380 state->committable_indices.rend(),
381 idx,
382 [](const auto& l, const auto& r) { return l >= r; });
383 if (it == state->committable_indices.rend())
384 {
385 return std::nullopt;
386 }
387
388 return *it;
389 }
390
392 {
393 while (!state->committable_indices.empty() &&
394 (state->committable_indices.front() <= idx))
395 {
396 state->committable_indices.pop_front();
397 }
398 }
399
400 void enable_all_domains() override
401 {
402 // When receiving append entries as a follower, all security domains will
403 // be deserialised
404 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
405 public_only = false;
406 }
407
408 void force_become_primary() override
409 {
410 // This is unsafe and should only be called when the node is certain
411 // there is no leader and no other node will attempt to force leadership.
412 if (leader_id.has_value())
413 {
414 throw std::logic_error(
415 "Can't force leadership if there is already a leader");
416 }
417
418 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
419 state->current_view += starting_view_change;
420 become_leader(true);
421 }
422
424 Index index,
425 Term term,
426 const std::vector<Index>& terms,
427 Index commit_idx_) override
428 {
429 // This is unsafe and should only be called when the node is certain
430 // there is no leader and no other node will attempt to force leadership.
431 if (leader_id.has_value())
432 {
433 throw std::logic_error(
434 "Can't force leadership if there is already a leader");
435 }
436
437 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
438 state->current_view = term;
439 state->last_idx = index;
440 state->commit_idx = commit_idx_;
441 state->view_history.initialise(terms);
442 state->view_history.update(index, term);
443 state->current_view += starting_view_change;
444 become_leader(true);
445 }
446
448 Index index,
449 Term term,
450 const std::vector<Index>& term_history,
451 Index recovery_start_index = 0) override
452 {
453 // This should only be called when the node resumes from a snapshot and
454 // before it has received any append entries.
455 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
456
457 state->last_idx = index;
458 state->commit_idx = index;
459
460 state->view_history.initialise(term_history);
461
462 ledger->init(index, recovery_start_index);
463
465 }
466
468 {
469 return state->last_idx;
470 }
471
473 {
474 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
475 return get_commit_idx_unsafe();
476 }
477
478 Term get_view() override
479 {
480 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
481 return state->current_view;
482 }
483
484 std::pair<Term, Index> get_committed_txid() override
485 {
486 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
487 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
488 return {get_term_internal(commit_idx), commit_idx};
489 }
490
491 Term get_view(Index idx) override
492 {
493 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
494 return get_term_internal(idx);
495 }
496
497 std::vector<Index> get_view_history(Index idx) override
498 {
499 // This should only be called when the spin lock is held.
500 return state->view_history.get_history_until(idx);
501 }
502
503 std::vector<Index> get_view_history_since(Index idx) override
504 {
505 // This should only be called when the spin lock is held.
506 return state->view_history.get_history_since(idx);
507 }
508
509 // Same as ccfraft.tla GetServerSet/IsInServerSet
510 // Not to be confused with all_other_nodes, which includes retired completed
511 // nodes. Used to restrict sending vote requests, and when becoming a
512 // leader, to decide whether to advance commit.
513 std::set<ccf::NodeId> other_nodes_in_active_configs() const
514 {
515 std::set<ccf::NodeId> nodes;
516
517 for (auto const& conf : configurations)
518 {
519 for (auto const& [node_id, _] : conf.nodes)
520 {
521 if (node_id != state->node_id)
522 {
523 nodes.insert(node_id);
524 }
525 }
526 }
527
528 return nodes;
529 }
530
531 public:
533 Index idx,
535 const std::unordered_set<ccf::NodeId>& new_learner_nodes = {},
536 const std::unordered_set<ccf::NodeId>& new_retired_nodes = {}) override
537 {
539 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
540
541 assert(new_learner_nodes.empty());
542
543#ifdef CCF_RAFT_TRACING
544 nlohmann::json j = {};
545 j["function"] = "add_configuration";
546 j["state"] = *state;
547 COMMITTABLE_INDICES(j["state"], state);
548 j["configurations"] = configurations;
549 j["args"] = nlohmann::json::object();
550 j["args"]["configuration"] = Configuration{idx, conf, idx};
552#endif
553
554 // Detect when we are retired by observing a configuration
555 // from which we are absent following a configuration in which
556 // we were included. Note that this relies on retirement being
557 // a final state, and node identities never being re-used.
558 if (
559 !configurations.empty() &&
560 configurations.back().nodes.find(state->node_id) !=
561 configurations.back().nodes.end() &&
562 conf.find(state->node_id) == conf.end())
563 {
564 become_retired(idx, ccf::kv::RetirementPhase::Ordered);
565 }
566
567 if (configurations.empty() || conf != configurations.back().nodes)
568 {
569 Configuration new_config = {idx, std::move(conf), idx};
570 configurations.push_back(new_config);
571
572 create_and_remove_node_state();
573 }
574 }
575
577 {
578 ticking = true;
579 using namespace std::chrono_literals;
580 timeout_elapsed = 0ms;
581 RAFT_INFO_FMT("Election timer has become active");
582 }
583
585 {
586 for (auto& node : all_other_nodes)
587 {
588 using namespace std::chrono_literals;
589 node.second.last_ack_timeout = 0ms;
590 }
591 }
592
594 {
595 if (configurations.empty())
596 {
597 return {};
598 }
599
600 return configurations.back().nodes;
601 }
602
604 {
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
607 }
608
610 {
612 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
613 details.primary_id = leader_id;
614 details.current_view = state->current_view;
615 details.ticking = ticking;
616 details.leadership_state = state->leadership_state;
617 details.membership_state = state->membership_state;
618 if (is_retired())
619 {
620 details.retirement_phase = state->retirement_phase;
621 }
622 for (auto const& conf : configurations)
623 {
624 details.configs.push_back(conf);
625 }
626 for (auto& [k, v] : all_other_nodes)
627 {
628 details.acks[k] = {
629 v.match_idx, static_cast<size_t>(v.last_ack_timeout.count())};
630 }
631 details.reconfiguration_type = reconfiguration_type;
632 return details;
633 }
634
635 bool replicate(const ccf::kv::BatchVector& entries, Term term) override
636 {
637 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
638
639 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
640 {
642 "Failed to replicate {} items: not leader", entries.size());
643 rollback(state->last_idx);
644 return false;
645 }
646
647 if (term != state->current_view)
648 {
650 "Failed to replicate {} items at term {}, current term is {}",
651 entries.size(),
652 term,
653 state->current_view);
654 return false;
655 }
656
658 {
660 "Failed to replicate {} items: node retirement is complete",
661 entries.size());
662 rollback(state->last_idx);
663 return false;
664 }
665
666 RAFT_DEBUG_FMT("Replicating {} entries", entries.size());
667
668 for (auto& [index, data, is_globally_committable, hooks] : entries)
669 {
670 bool globally_committable = is_globally_committable;
671
672 if (index != state->last_idx + 1)
673 return false;
674
676 "Replicated on leader {}: {}{} ({} hooks)",
677 state->node_id,
678 index,
679 (globally_committable ? " committable" : ""),
680 hooks->size());
681
682#ifdef CCF_RAFT_TRACING
683 nlohmann::json j = {};
684 j["function"] = "replicate";
685 j["state"] = *state;
686 COMMITTABLE_INDICES(j["state"], state);
687 j["view"] = term;
688 j["seqno"] = index;
689 j["globally_committable"] = globally_committable;
691#endif
692
693 for (auto& hook : *hooks)
694 {
695 hook->call(this);
696 }
697
698 if (globally_committable)
699 {
701 "membership: {} leadership: {}",
702 state->membership_state,
703 state->leadership_state);
704 if (
705 state->membership_state == ccf::kv::MembershipState::Retired &&
706 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
707 {
708 become_retired(index, ccf::kv::RetirementPhase::Signed);
709 }
710 state->committable_indices.push_back(index);
711 start_ticking_if_necessary();
712
713 // Reset should_sign here - whenever we see a committable entry we
714 // don't need to produce _another_ signature
715 should_sign = false;
716 }
717
718 state->last_idx = index;
719 ledger->put_entry(
720 *data, globally_committable, state->current_view, index);
721 entry_size_not_limited += data->size();
722 entry_count++;
723
724 state->view_history.update(index, state->current_view);
725 if (entry_size_not_limited >= append_entries_size_limit)
726 {
727 update_batch_size();
728 entry_count = 0;
729 entry_size_not_limited = 0;
730 for (const auto& it : all_other_nodes)
731 {
732 RAFT_DEBUG_FMT("Sending updates to follower {}", it.first);
733 send_append_entries(it.first, it.second.sent_idx + 1);
734 }
735 }
736 }
737
738 // Try to advance commit at once if there are no other nodes.
739 if (other_nodes_in_active_configs().size() == 0)
740 {
741 update_commit();
742 }
743
744 return true;
745 }
746
748 const ccf::NodeId& from, const uint8_t* data, size_t size) override
749 {
750 RaftMsgType type = serialized::peek<RaftMsgType>(data, size);
751
752 try
753 {
754 switch (type)
755 {
757 {
758 AppendEntries r =
759 channels->template recv_authenticated<AppendEntries>(
760 from, data, size);
761 recv_append_entries(from, r, data, size);
762 break;
763 }
764
766 {
768 channels->template recv_authenticated<AppendEntriesResponse>(
769 from, data, size);
770 recv_append_entries_response(from, r);
771 break;
772 }
773
775 {
776 RequestVote r = channels->template recv_authenticated<RequestVote>(
777 from, data, size);
778 recv_request_vote(from, r);
779 break;
780 }
781
783 {
785 channels->template recv_authenticated<RequestVoteResponse>(
786 from, data, size);
787 recv_request_vote_response(from, r);
788 break;
789 }
790
792 {
794 channels->template recv_authenticated<ProposeRequestVote>(
795 from, data, size);
796 recv_propose_request_vote(from, r);
797 break;
798 }
799
800 default:
801 {
802 RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
803 }
804 }
805 }
807 {
808 RAFT_INFO_FMT("Dropped invalid message from {}", e.from);
809 return;
810 }
812 {
813 RAFT_FAIL_FMT("Failed to parse message: {}", ise.what());
814 return;
815 }
816 catch (const std::exception& e)
817 {
818 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
819 LOG_DEBUG_FMT("Error: {}", e.what());
820 return;
821 }
822 }
823
824 void periodic(std::chrono::milliseconds elapsed) override
825 {
826 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
827 timeout_elapsed += elapsed;
828
829 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
830 {
831 if (timeout_elapsed >= request_timeout)
832 {
833 using namespace std::chrono_literals;
834 timeout_elapsed = 0ms;
835
836 update_batch_size();
837 // Send newly available entries to all other nodes.
838 for (const auto& node : all_other_nodes)
839 {
840 send_append_entries(node.first, node.second.sent_idx + 1);
841 }
842 }
843
844 for (auto& node : all_other_nodes)
845 {
846 node.second.last_ack_timeout += elapsed;
847 }
848
849 bool has_quorum_of_backups = false;
850 for (auto const& conf : configurations)
851 {
852 size_t backup_ack_timeout_count = 0;
853 for (auto const& node : conf.nodes)
854 {
855 auto search = all_other_nodes.find(node.first);
856 if (search == all_other_nodes.end())
857 {
858 // Ignore ourselves as primary
859 continue;
860 }
861 if (search->second.last_ack_timeout >= election_timeout)
862 {
864 "No ack received from {} in last {}",
865 node.first,
866 election_timeout);
867 backup_ack_timeout_count++;
868 }
869 }
870
871 if (backup_ack_timeout_count < get_quorum(conf.nodes.size() - 1))
872 {
873 // If primary has quorum of active backups in _any_ configuration,
874 // it should remain primary
875 has_quorum_of_backups = true;
876 break;
877 }
878 }
879
880 if (!has_quorum_of_backups)
881 {
882 // CheckQuorum: The primary automatically steps down if there are no
883 // active configuration in which it has heard back from a majority of
884 // backups within an election timeout.
885 // Also see CheckQuorum action in tla/ccfraft.tla.
887 "Stepping down as leader {}: No ack received from a majority of "
888 "backups in last {}",
889 state->node_id,
890 election_timeout);
892 }
893 }
894 else
895 {
896 if (
897 !is_retired_committed() && ticking &&
898 timeout_elapsed >= election_timeout)
899 {
900 // Start an election.
901 become_candidate();
902 }
903 }
904 }
905
906 private:
907 Index find_highest_possible_match(const ccf::TxID& tx_id)
908 {
909 // Find the highest TxID this node thinks exists, which is still
910 // compatible with the given tx_id. That is, given T.n, find largest n'
911 // such that n' <= n && term_of(n') == T' && T' <= T. This may be T.n
912 // itself, if this node holds that index. Otherwise, examine the final
913 // entry in each term, counting backwards, until we find one which is
914 // still possible.
915 Index probe_index = std::min(tx_id.seqno, state->last_idx);
916 Term term_of_probe = state->view_history.view_at(probe_index);
917 while (term_of_probe > tx_id.view)
918 {
919 // Next possible match is the end of the previous term, which is
920 // 1-before the start of the currently considered term. Anything after
921 // that must have a term which is still too high.
922 probe_index = state->view_history.start_of_view(term_of_probe);
923 if (probe_index > 0)
924 {
925 --probe_index;
926 }
927 term_of_probe = state->view_history.view_at(probe_index);
928 }
929
931 "Looking for match with {}.{}, from {}.{}, best answer is {}",
932 tx_id.view,
933 tx_id.seqno,
934 state->view_history.view_at(state->last_idx),
935 state->last_idx,
936 probe_index);
937 return probe_index;
938 }
939
940 inline void update_batch_size()
941 {
942 auto avg_entry_size = (entry_count == 0) ?
944 entry_size_not_limited / entry_count;
945
946 auto batch_size = (avg_entry_size == 0) ?
948 append_entries_size_limit / avg_entry_size;
949
950 auto batch_avg = batch_window_sum / batch_window_size;
951 // balance out total batch size across batch window
952 batch_window_sum += (batch_size - batch_avg);
953 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
954 }
955
956 Term get_term_internal(Index idx)
957 {
958 if (idx > state->last_idx)
959 return ccf::VIEW_UNKNOWN;
960
961 return state->view_history.view_at(idx);
962 }
963
964 bool can_replicate_unsafe()
965 {
966 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
968 }
969
970 bool can_sign_unsafe()
971 {
972 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
974 }
975
976 Index get_commit_idx_unsafe()
977 {
978 return state->commit_idx;
979 }
980
981 void send_append_entries(const ccf::NodeId& to, Index start_idx)
982 {
984 "Sending append entries to node {} in batches of {}, covering the "
985 "range {} -> {}",
986 to,
987 entries_batch_size,
988 start_idx,
989 state->last_idx);
990
991 auto calculate_end_index = [this](Index start) {
992 // Cap the end index in 2 ways:
993 // - Must contain no more than entries_batch_size entries
994 // - Must contain entries from a single term
995 static_assert(
996 max_terms_per_append_entries == 1,
997 "AppendEntries construction logic enforces single term");
998 auto max_idx = state->last_idx;
999 const auto term_of_ae = state->view_history.view_at(start);
1000 const auto index_at_end_of_term =
1001 state->view_history.end_of_view(term_of_ae);
1002 if (index_at_end_of_term != ccf::kv::NoVersion)
1003 {
1004 max_idx = index_at_end_of_term;
1005 }
1006 return std::min(start + entries_batch_size - 1, max_idx);
1007 };
1008
1009 Index end_idx;
1010
1011 // We break _after_ sending, so that in the case where this is called
1012 // with start==last, we send a single empty heartbeat
1013 do
1014 {
1015 end_idx = calculate_end_index(start_idx);
1016 RAFT_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
1017 send_append_entries_range(to, start_idx, end_idx);
1018 start_idx = std::min(end_idx + 1, state->last_idx);
1019 } while (end_idx != state->last_idx);
1020 }
1021
1022 void send_append_entries_range(
1023 const ccf::NodeId& to, Index start_idx, Index end_idx)
1024 {
1025 const auto prev_idx = start_idx - 1;
1026
1027 if (is_retired_committed() && start_idx >= end_idx)
1028 {
1029 // Continue to replicate, but do not send heartbeats if we know our
1030 // retirement is committed
1031 return;
1032 }
1033
1034 const auto prev_term = get_term_internal(prev_idx);
1035 const auto term_of_idx = get_term_internal(end_idx);
1036
1038 "Send append entries from {} to {}: ({}.{}, {}.{}] ({})",
1039 state->node_id,
1040 to,
1041 prev_term,
1042 prev_idx,
1043 term_of_idx,
1044 end_idx,
1045 state->commit_idx);
1046
1047#pragma clang diagnostic push
1048#pragma clang diagnostic ignored "-Wc99-designator"
1049 AppendEntries ae{
1050 {},
1051 {.idx = end_idx, .prev_idx = prev_idx},
1052 .term = state->current_view,
1053 .prev_term = prev_term,
1054 .leader_commit_idx = state->commit_idx,
1055 .term_of_idx = term_of_idx,
1056 };
1057#pragma clang diagnostic pop
1058
1059 auto& node = all_other_nodes.at(to);
1060
1061#ifdef CCF_RAFT_TRACING
1062 nlohmann::json j = {};
1063 j["function"] = "send_append_entries";
1064 j["packet"] = ae;
1065 j["state"] = *state;
1066 COMMITTABLE_INDICES(j["state"], state);
1067 j["to_node_id"] = to;
1068 j["match_idx"] = node.match_idx;
1069 j["sent_idx"] = node.sent_idx;
1071#endif
1072
1073 // The host will append log entries to this message when it is
1074 // sent to the destination node.
1075 if (!channels->send_authenticated(
1077 {
1078 return;
1079 }
1080
1081 // Record the most recent index we have sent to this node.
1082 node.sent_idx = end_idx;
1083 }
1084
1085 void recv_append_entries(
1086 const ccf::NodeId& from,
1087 AppendEntries r,
1088 const uint8_t* data,
1089 size_t size)
1090 {
1091 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1092
1094 "Received append entries: {}.{} to {}.{} (from {} in term {})",
1095 r.prev_term,
1096 r.prev_idx,
1097 r.term_of_idx,
1098 r.idx,
1099 from,
1100 r.term);
1101
1102#ifdef CCF_RAFT_TRACING
1103 nlohmann::json j = {};
1104 j["function"] = "recv_append_entries";
1105 j["packet"] = r;
1106 j["state"] = *state;
1107 COMMITTABLE_INDICES(j["state"], state);
1108 j["from_node_id"] = from;
1110#endif
1111
1112 // Don't check that the sender node ID is valid. Accept anything that
1113 // passes the integrity check. This way, entries containing dynamic
1114 // topology changes that include adding this new leader can be accepted.
1115
1116 // First, check append entries term against our own term, becoming
1117 // follower if necessary
1118 if (
1119 state->current_view == r.term &&
1120 state->leadership_state == ccf::kv::LeadershipState::Candidate)
1121 {
1123 }
1124 else if (state->current_view < r.term)
1125 {
1127 }
1128 else if (state->current_view > r.term)
1129 {
1130 // Reply false, since our term is later than the received term.
1132 "Recv append entries to {} from {} but our term is later ({} > {})",
1133 state->node_id,
1134 from,
1135 state->current_view,
1136 r.term);
1137 send_append_entries_response_nack(from);
1138 return;
1139 }
1140
1141 // Second, check term consistency with the entries we have so far
1142 const auto prev_term = get_term_internal(r.prev_idx);
1143 if (prev_term != r.prev_term)
1144 {
1146 "Previous term for {} should be {}", r.prev_idx, prev_term);
1147
1148 // Reply false if the log doesn't contain an entry at r.prev_idx
1149 // whose term is r.prev_term. Rejects "future" entries.
1150 if (prev_term == 0)
1151 {
1153 "Recv append entries to {} from {} but our log does not yet "
1154 "contain index {}",
1155 state->node_id,
1156 from,
1157 r.prev_idx);
1158 send_append_entries_response_nack(from);
1159 }
1160 else
1161 {
1163 "Recv append entries to {} from {} but our log at {} has the wrong "
1164 "previous term (ours: {}, theirs: {})",
1165 state->node_id,
1166 from,
1167 r.prev_idx,
1168 prev_term,
1169 r.prev_term);
1170 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1171 send_append_entries_response_nack(from, rejected_tx);
1172 }
1173 return;
1174 }
1175
1176 // If the terms match up, it is sufficient to convince us that the sender
1177 // is leader in our term
1178 restart_election_timeout();
1179 if (!leader_id.has_value() || leader_id.value() != from)
1180 {
1181 leader_id = from;
1183 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1184 }
1185
1186 // Third, check index consistency, making sure entries are not in the past
1187 if (r.prev_idx < state->commit_idx)
1188 {
1190 "Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
1191 "({})",
1192 state->node_id,
1193 from,
1194 r.prev_idx,
1195 state->commit_idx);
1196 return;
1197 }
1198 // Redundant with check on get_term_internal() at line 1149
1199 // Which captures this case in every situation, except r.prev_term == 0.
1200 // That only happens if r.prev_idx == 0 however, see line 1033,
1201 // in which case this path should not be taken either.
1202 else if (r.prev_idx > state->last_idx)
1203 {
1205 "Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
1206 state->node_id,
1207 from,
1208 r.prev_idx,
1209 state->last_idx);
1210 return;
1211 }
1212
1214 "Recv append entries to {} from {} for index {} and previous index {}",
1215 state->node_id,
1216 from,
1217 r.idx,
1218 r.prev_idx);
1219
1220 std::vector<std::tuple<
1221 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1223 append_entries;
1224 // Finally, deserialise each entry in the batch
1225 for (Index i = r.prev_idx + 1; i <= r.idx; i++)
1226 {
1227 if (i <= state->last_idx)
1228 {
1229 // NB: This is only safe as long as AppendEntries only contain a
1230 // single term. If they cover multiple terms, then we would need to at
1231 // least partially deserialise each entry to establish what term it is
1232 // in (or report the terms in the header)
1233 static_assert(
1234 max_terms_per_append_entries == 1,
1235 "AppendEntries rollback logic assumes single term");
1236 const auto incoming_term = r.term_of_idx;
1237 const auto local_term = state->view_history.view_at(i);
1238 if (incoming_term != local_term)
1239 {
1240 if (is_new_follower)
1241 {
1242 auto rollback_level = i - 1;
1244 "New follower received AppendEntries with conflict. Incoming "
1245 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1246 incoming_term,
1247 i,
1248 local_term,
1249 i,
1250 rollback_level);
1252 "Dropping conflicting branch. Rolling back {} entries, "
1253 "beginning with {}.{}.",
1254 state->last_idx - rollback_level,
1255 local_term,
1256 i);
1257 rollback(rollback_level);
1258 is_new_follower = false;
1259 // Then continue to process this AE as normal
1260 }
1261 else
1262 {
1263 // We have a node retaining a conflicting suffix, and refusing to
1264 // roll it back. It will remain divergent (not contributing to
1265 // commit) this term, and can only be brought in-sync in a future
1266 // term.
1267 // This log is emitted as a canary, for what we hope is an
1268 // unreachable branch. If it is ever seen we should revisit this.
1270 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1271 "beginning with {}.{}.",
1272 state->last_idx - (i - 1),
1273 local_term,
1274 i);
1275 return;
1276 }
1277 }
1278 else
1279 {
1280 // If the current entry has already been deserialised, skip the
1281 // payload for that entry
1282 ledger->skip_entry(data, size);
1283 continue;
1284 }
1285 }
1286
1287 std::vector<uint8_t> entry;
1288 try
1289 {
1290 entry = LedgerProxy::get_entry(data, size);
1291 }
1292 catch (const std::logic_error& e)
1293 {
1294 // This should only fail if there is malformed data.
1296 "Recv append entries to {} from {} but the data is malformed: {}",
1297 state->node_id,
1298 from,
1299 e.what());
1300 send_append_entries_response_nack(from);
1301 return;
1302 }
1303
1304 ccf::kv::TxID expected{r.term_of_idx, i};
1305 auto ds = store->deserialize(entry, public_only, expected);
1306 if (ds == nullptr)
1307 {
1309 "Recv append entries to {} from {} but the entry could not be "
1310 "deserialised",
1311 state->node_id,
1312 from);
1313 send_append_entries_response_nack(from);
1314 return;
1315 }
1316
1317 append_entries.push_back(std::make_tuple(std::move(ds), i));
1318 }
1319
1320 execute_append_entries_sync(
1321 std::move(append_entries), from, std::move(r));
1322 }
1323
1324 void execute_append_entries_sync(
1325 std::vector<std::tuple<
1326 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1327 ccf::kv::Version>>&& append_entries,
1328 const ccf::NodeId& from,
1329 AppendEntries&& r)
1330 {
1331 for (auto& ae : append_entries)
1332 {
1333 auto& [ds, i] = ae;
1334 RAFT_DEBUG_FMT("Replicating on follower {}: {}", state->node_id, i);
1335
1336#ifdef CCF_RAFT_TRACING
1337 nlohmann::json j = {};
1338 j["function"] = "execute_append_entries_sync";
1339 j["state"] = *state;
1340 COMMITTABLE_INDICES(j["state"], state);
1341 j["from_node_id"] = from;
1343#endif
1344
1345 bool track_deletes_on_missing_keys = false;
1346 ccf::kv::ApplyResult apply_success =
1347 ds->apply(track_deletes_on_missing_keys);
1348 if (apply_success == ccf::kv::ApplyResult::FAIL)
1349 {
1350 ledger->truncate(i - 1);
1351 send_append_entries_response_nack(from);
1352 return;
1353 }
1354 state->last_idx = i;
1355
1356 for (auto& hook : ds->get_hooks())
1357 {
1358 hook->call(this);
1359 }
1360
1361 bool globally_committable =
1362 (apply_success == ccf::kv::ApplyResult::PASS_SIGNATURE);
1363 if (globally_committable)
1364 {
1365 start_ticking_if_necessary();
1366 }
1367
1368 const auto& entry = ds->get_entry();
1369
1370 ledger->put_entry(
1371 entry, globally_committable, ds->get_term(), ds->get_index());
1372
1373 switch (apply_success)
1374 {
1376 {
1377 RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
1378 state->last_idx--;
1379 ledger->truncate(state->last_idx);
1380 send_append_entries_response_nack(from);
1381 break;
1382 }
1383
1385 {
1386 RAFT_DEBUG_FMT("Deserialising signature at {}", i);
1387 if (
1388 state->membership_state == ccf::kv::MembershipState::Retired &&
1389 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
1390 {
1391 become_retired(i, ccf::kv::RetirementPhase::Signed);
1392 }
1393 state->committable_indices.push_back(i);
1394
1395 if (ds->get_term())
1396 {
1397 // A signature for sig_term tells us that all transactions from
1398 // the previous signature onwards (at least, if not further back)
1399 // happened in sig_term. We reflect this in the history.
1400 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1401 {
1402 state->view_history.update(1, r.term);
1403 }
1404 else
1405 {
1406 // NB: This is only safe as long as AppendEntries only contain a
1407 // single term. If they cover multiple terms, then we need to
1408 // know our previous signature locally.
1409 static_assert(
1410 max_terms_per_append_entries == 1,
1411 "AppendEntries processing for term updates assumes single "
1412 "term");
1413 state->view_history.update(r.prev_idx + 1, ds->get_term());
1414 }
1415
1416 commit_if_possible(r.leader_commit_idx);
1417 }
1418 break;
1419 }
1420
1422 {
1423 break;
1424 }
1425
1427 {
1428 break;
1429 }
1430
1431 default:
1432 {
1433 throw std::logic_error("Unknown ApplyResult value");
1434 }
1435 }
1436 }
1437
1438 execute_append_entries_finish(r, from);
1439 }
1440
1441 void execute_append_entries_finish(
1442 AppendEntries& r, const ccf::NodeId& from)
1443 {
1444 // After entries have been deserialised, try to commit the leader's
1445 // commit index and update our term history accordingly
1446 commit_if_possible(r.leader_commit_idx);
1447
1448 // The term may have changed, and we have not have seen a signature yet.
1449 auto lci = last_committable_index();
1450 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1451 {
1452 // If we don't yet have a term history, then this must be happening in
1453 // the current term. This can only happen before _any_ transactions have
1454 // occurred, when processing a heartbeat at index 0, which does not
1455 // happen in a real node (due to the genesis transaction executing
1456 // before ticks start), but may happen in tests.
1457 state->view_history.update(1, r.term);
1458 }
1459 else
1460 {
1461 // The end of this append entries (r.idx) was not a signature, but may
1462 // be in a new term. If it's a new term, this term started immediately
1463 // after the previous signature we saw (lci, last committable index).
1464 if (r.idx > lci)
1465 {
1466 state->view_history.update(lci + 1, r.term_of_idx);
1467 }
1468 }
1469
1470 send_append_entries_response_ack(from, r);
1471 }
1472
1473 void send_append_entries_response_ack(
1474 ccf::NodeId to, const AppendEntries& ae)
1475 {
1476 // If we get to here, we have applied up to r.idx in this AppendEntries.
1477 // We must only ACK this far, as we know nothing about the agreement of a
1478 // suffix we may still hold _after_ r.idx with the leader's log
1479 const auto response_idx = ae.idx;
1480 send_append_entries_response(
1481 to, AppendEntriesResponseType::OK, state->current_view, response_idx);
1482 }
1483
1484 void send_append_entries_response_nack(
1485 ccf::NodeId to, const ccf::TxID& rejected)
1486 {
1487 const auto response_idx = find_highest_possible_match(rejected);
1488 const auto response_term = get_term_internal(response_idx);
1489
1490 send_append_entries_response(
1491 to, AppendEntriesResponseType::FAIL, response_term, response_idx);
1492 }
1493
1494 void send_append_entries_response_nack(ccf::NodeId to)
1495 {
1496 send_append_entries_response(
1497 to,
1499 state->current_view,
1500 state->last_idx);
1501 }
1502
1503 void send_append_entries_response(
1504 ccf::NodeId to,
1506 aft::Term response_term,
1507 aft::Index response_idx)
1508 {
1510 "Send append entries response from {} to {} for index {}: {}",
1511 state->node_id,
1512 to,
1513 response_idx,
1514 (answer == AppendEntriesResponseType::OK ? "ACK" : "NACK"));
1515
1516 AppendEntriesResponse response{
1517 .term = response_term,
1518 .last_log_idx = response_idx,
1519 .success = answer,
1520 };
1521
1522#ifdef CCF_RAFT_TRACING
1523 nlohmann::json j = {};
1524 j["function"] = "send_append_entries_response";
1525 j["packet"] = response;
1526 j["state"] = *state;
1527 COMMITTABLE_INDICES(j["state"], state);
1528 j["to_node_id"] = to;
1530#endif
1531
1532 channels->send_authenticated(
1533 to, ccf::NodeMsgType::consensus_msg, response);
1534 }
1535
1536 void recv_append_entries_response(
1537 const ccf::NodeId& from, AppendEntriesResponse r)
1538 {
1539 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1540
1541 auto node = all_other_nodes.find(from);
1542 if (node == all_other_nodes.end())
1543 {
1544 // Ignore if we don't recognise the node.
1546 "Recv append entries response to {} from {}: unknown node",
1547 state->node_id,
1548 from);
1549 return;
1550 }
1551
1552#ifdef CCF_RAFT_TRACING
1553 nlohmann::json j = {};
1554 j["function"] = "recv_append_entries_response";
1555 j["packet"] = r;
1556 j["state"] = *state;
1557 COMMITTABLE_INDICES(j["state"], state);
1558 j["from_node_id"] = from;
1559 j["match_idx"] = node->second.match_idx;
1560 j["sent_idx"] = node->second.sent_idx;
1562#endif
1563
1564 // Ignore if we're not the leader.
1565 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
1566 {
1568 "Recv append entries response to {} from {}: no longer leader",
1569 state->node_id,
1570 from);
1571 return;
1572 }
1573
1574 using namespace std::chrono_literals;
1575 node->second.last_ack_timeout = 0ms;
1576
1577 if (state->current_view < r.term)
1578 {
1579 // We are behind, update our state.
1581 "Recv append entries response to {} from {}: more recent term ({} "
1582 "> {})",
1583 state->node_id,
1584 from,
1585 r.term,
1586 state->current_view);
1588 return;
1589 }
1590 else if (state->current_view != r.term)
1591 {
1592 // Stale response, discard if success.
1593 // Otherwise reset sent_idx and try again.
1594 // NB: In NACKs the term may be that of an estimated matching index
1595 // in the log, rather than the current term, so it is correct for it to
1596 // be older in this case.
1597 if (r.success == AppendEntriesResponseType::OK)
1598 {
1600 "Recv append entries response to {} from {}: stale term ({} != {})",
1601 state->node_id,
1602 from,
1603 r.term,
1604 state->current_view);
1605 return;
1606 }
1607 }
1608 else if (r.last_log_idx < node->second.match_idx)
1609 {
1610 // Response about past indices, discard if success.
1611 // Otherwise reset sent_idx and try again.
1612 // NB: It is correct for this index to move backwards during NACKs
1613 // which iteratively discover the last matching index of divergent logs
1614 // after an election.
1615 if (r.success == AppendEntriesResponseType::OK)
1616 {
1618 "Recv append entries response to {} from {}: stale idx",
1619 state->node_id,
1620 from);
1621 return;
1622 }
1623 }
1624
1625 // Update next or match for the responding node.
1626 if (r.success == AppendEntriesResponseType::FAIL)
1627 {
1628 // Failed due to log inconsistency. Reset sent_idx, and try again soon.
1630 "Recv append entries response to {} from {}: failed",
1631 state->node_id,
1632 from);
1633 const auto this_match =
1634 find_highest_possible_match({r.term, r.last_log_idx});
1635 node->second.sent_idx = std::max(
1636 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1637 return;
1638 }
1639 else
1640 {
1641 // max(...) because why would we ever want to go backwards on a success
1642 // response?!
1643 node->second.match_idx =
1644 std::max(node->second.match_idx, r.last_log_idx);
1645 }
1646
1648 "Recv append entries response to {} from {} for index {}: success",
1649 state->node_id,
1650 from,
1651 r.last_log_idx);
1652 update_commit();
1653 }
1654
1655 void send_request_vote(const ccf::NodeId& to)
1656 {
1657 auto last_committable_idx = last_committable_index();
1659 "Send request vote from {} to {} at {}",
1660 state->node_id,
1661 to,
1662 last_committable_idx);
1663 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1664
1665 RequestVote rv{
1666 .term = state->current_view,
1667 .last_committable_idx = last_committable_idx,
1668 .term_of_last_committable_idx = get_term_internal(last_committable_idx),
1669 };
1670
1671#ifdef CCF_RAFT_TRACING
1672 nlohmann::json j = {};
1673 j["function"] = "send_request_vote";
1674 j["packet"] = rv;
1675 j["state"] = *state;
1676 COMMITTABLE_INDICES(j["state"], state);
1677 j["to_node_id"] = to;
1679#endif
1680
1681 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rv);
1682 }
1683
1684 void recv_request_vote(const ccf::NodeId& from, RequestVote r)
1685 {
1686 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1687
1688 // Do not check that from is a known node. It is possible to receive
1689 // RequestVotes from nodes that this node doesn't yet know, just as it
1690 // receives AppendEntries from those nodes. These should be obeyed just
1691 // like any other RequestVote - it is possible that this node is needed to
1692 // produce a primary in the new term, who will then help this node catch
1693 // up.
1694
1695#ifdef CCF_RAFT_TRACING
1696 nlohmann::json j = {};
1697 j["function"] = "recv_request_vote";
1698 j["packet"] = r;
1699 j["state"] = *state;
1700 COMMITTABLE_INDICES(j["state"], state);
1701 j["from_node_id"] = from;
1703#endif
1704
1705 if (state->current_view > r.term)
1706 {
1707 // Reply false, since our term is later than the received term.
1709 "Recv request vote to {} from {}: our term is later ({} > {})",
1710 state->node_id,
1711 from,
1712 state->current_view,
1713 r.term);
1714 send_request_vote_response(from, false);
1715 return;
1716 }
1717 else if (state->current_view < r.term)
1718 {
1720 "Recv request vote to {} from {}: their term is later ({} < {})",
1721 state->node_id,
1722 from,
1723 state->current_view,
1724 r.term);
1726 }
1727
1728 if (leader_id.has_value())
1729 {
1730 // Reply false, since we already know the leader in the current term.
1732 "Recv request vote to {} from {}: leader {} already known in term {}",
1733 state->node_id,
1734 from,
1735 leader_id.value(),
1736 state->current_view);
1737 send_request_vote_response(from, false);
1738 return;
1739 }
1740
1741 if ((voted_for.has_value()) && (voted_for.value() != from))
1742 {
1743 // Reply false, since we already voted for someone else.
1745 "Recv request vote to {} from {}: already voted for {}",
1746 state->node_id,
1747 from,
1748 voted_for.value());
1749 send_request_vote_response(from, false);
1750 return;
1751 }
1752
1753 // If the candidate's committable log is at least as up-to-date as ours,
1754 // vote yes
1755
1756 const auto last_committable_idx = last_committable_index();
1757 const auto term_of_last_committable_idx =
1758 get_term_internal(last_committable_idx);
1759
1760 const auto answer =
1761 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1762 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1763 (r.last_committable_idx >= last_committable_idx));
1764
1765 if (answer)
1766 {
1767 // If we grant our vote, we also acknowledge that an election is in
1768 // progress.
1769 restart_election_timeout();
1770 leader_id.reset();
1771 voted_for = from;
1772 }
1773 else
1774 {
1776 "Voting against candidate at {}.{} because local state is at {}.{}",
1777 r.term_of_last_committable_idx,
1778 r.last_committable_idx,
1779 term_of_last_committable_idx,
1780 last_committable_idx);
1781 }
1782
1783 send_request_vote_response(from, answer);
1784 }
1785
1786 void send_request_vote_response(const ccf::NodeId& to, bool answer)
1787 {
1789 "Send request vote response from {} to {}: {}",
1790 state->node_id,
1791 to,
1792 answer);
1793
1794 RequestVoteResponse response{
1795 .term = state->current_view, .vote_granted = answer};
1796
1797 channels->send_authenticated(
1798 to, ccf::NodeMsgType::consensus_msg, response);
1799 }
1800
1801 void recv_request_vote_response(
1802 const ccf::NodeId& from, RequestVoteResponse r)
1803 {
1804 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1805
1806#ifdef CCF_RAFT_TRACING
1807 nlohmann::json j = {};
1808 j["function"] = "recv_request_vote_response";
1809 j["packet"] = r;
1810 j["state"] = *state;
1811 COMMITTABLE_INDICES(j["state"], state);
1812 j["from_node_id"] = from;
1814#endif
1815
1816 if (state->leadership_state != ccf::kv::LeadershipState::Candidate)
1817 {
1819 "Recv request vote response to {} from: {}: we aren't a candidate",
1820 state->node_id,
1821 from);
1822 return;
1823 }
1824
1825 // Ignore if we don't recognise the node.
1826 auto node = all_other_nodes.find(from);
1827 if (node == all_other_nodes.end())
1828 {
1830 "Recv request vote response to {} from {}: unknown node",
1831 state->node_id,
1832 from);
1833 return;
1834 }
1835
1836 if (state->current_view < r.term)
1837 {
1839 "Recv request vote response to {} from {}: their term is more recent "
1840 "({} < {})",
1841 state->node_id,
1842 from,
1843 state->current_view,
1844 r.term);
1846 return;
1847 }
1848 else if (state->current_view != r.term)
1849 {
1850 // Ignore as it is stale.
1852 "Recv request vote response to {} from {}: stale ({} != {})",
1853 state->node_id,
1854 from,
1855 state->current_view,
1856 r.term);
1857 return;
1858 }
1859 else if (!r.vote_granted)
1860 {
1861 // Do nothing.
1863 "Recv request vote response to {} from {}: they voted no",
1864 state->node_id,
1865 from);
1866 return;
1867 }
1868
1870 "Recv request vote response to {} from {}: they voted yes",
1871 state->node_id,
1872 from);
1873
1874 add_vote_for_me(from);
1875 }
1876
1877 void recv_propose_request_vote(
1878 const ccf::NodeId& from, ProposeRequestVote r)
1879 {
1880 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1881
1882#ifdef CCF_RAFT_TRACING
1883 nlohmann::json j = {};
1884 j["function"] = "recv_propose_request_vote";
1885 j["packet"] = r;
1886 j["state"] = *state;
1887 COMMITTABLE_INDICES(j["state"], state);
1888 j["from_node_id"] = from;
1890#endif
1891 if (!is_retired_committed() && ticking && r.term == state->current_view)
1892 {
1894 "Becoming candidate early due to propose request vote from {}", from);
1895 become_candidate();
1896 }
1897 else
1898 {
1899 RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
1900 }
1901 }
1902
1903 void restart_election_timeout()
1904 {
1905 // Randomise timeout_elapsed to get a random election timeout
1906 // between 0.5x and 1x the configured election timeout.
1907 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
1908 }
1909
1910 void reset_votes_for_me()
1911 {
1912 votes_for_me.clear();
1913 for (auto const& conf : configurations)
1914 {
1915 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
1916 votes_for_me[conf.idx].votes.clear();
1917 }
1918 }
1919
1920 // ccfraft!Timeout
1921 void become_candidate()
1922 {
1923 if (configurations.empty())
1924 {
1925 // ccfraft!Timeout:
1926 // /\ \E c \in DOMAIN configurations[i] :
1927 // /\ i \in configurations[i][c]
1929 "Not becoming candidate {} due to lack of a configuration.",
1930 state->node_id);
1931 return;
1932 }
1933
1934 state->leadership_state = ccf::kv::LeadershipState::Candidate;
1935 leader_id.reset();
1936
1937 voted_for = state->node_id;
1938 reset_votes_for_me();
1939 state->current_view++;
1940
1941 restart_election_timeout();
1943
1945 "Becoming candidate {}: {}", state->node_id, state->current_view);
1946
1947#ifdef CCF_RAFT_TRACING
1948 nlohmann::json j = {};
1949 j["function"] = "become_candidate";
1950 j["state"] = *state;
1951 COMMITTABLE_INDICES(j["state"], state);
1952 j["configurations"] = configurations;
1954#endif
1955
1956 add_vote_for_me(state->node_id);
1957
1958 // Request votes only go to nodes in configurations, since only
1959 // their votes can be tallied towards an election quorum.
1960 for (auto const& node_id : other_nodes_in_active_configs())
1961 {
1962 // ccfraft!RequestVote
1963 send_request_vote(node_id);
1964 }
1965 }
1966
1967 void become_leader(bool force_become_leader = false)
1968 {
1970 {
1971 return;
1972 }
1973
1974 const auto election_index = last_committable_index();
1975
1977 "Election index is {} in term {}", election_index, state->current_view);
1978 // Discard any un-committable updates we may hold,
1979 // since we have no signature for them. Except at startup,
1980 // where we do not want to roll back the genesis transaction.
1981 if (state->commit_idx > 0)
1982 {
1983 rollback(election_index);
1984 }
1985 else
1986 {
1987 // but we still want the KV to know which term we're in
1988 store->initialise_term(state->current_view);
1989 }
1990
1991 state->leadership_state = ccf::kv::LeadershipState::Leader;
1992 leader_id = state->node_id;
1993 should_sign = true;
1994
1995 using namespace std::chrono_literals;
1996 timeout_elapsed = 0ms;
1997
1999
2001 "Becoming leader {}: {}", state->node_id, state->current_view);
2002
2003#ifdef CCF_RAFT_TRACING
2004 nlohmann::json j = {};
2005 j["function"] = "become_leader";
2006 j["state"] = *state;
2007 COMMITTABLE_INDICES(j["state"], state);
2008 j["configurations"] = configurations;
2010#endif
2011
2012 // Try to advance commit at once if there are no other nodes.
2013 if (other_nodes_in_active_configs().size() == 0)
2014 {
2015 update_commit();
2016 }
2017
2018 // Reset next, match, and sent indices for all nodes.
2019 auto next = state->last_idx + 1;
2020
2021 for (auto& node : all_other_nodes)
2022 {
2023 node.second.match_idx = 0;
2024 node.second.sent_idx = next - 1;
2025
2026 // Send an empty append_entries to all nodes.
2027 send_append_entries(node.first, next);
2028 }
2029
2030 if (retired_node_cleanup)
2031 {
2032 retired_node_cleanup->cleanup();
2033 }
2034 }
2035
2036 public:
2037 // Called when a replica becomes follower in the same term, e.g. when the
2038 // primary node has not received a majority of acks (CheckQuorum)
2040 {
2041 leader_id.reset();
2042 restart_election_timeout();
2044
2045 // Drop anything unsigned here, but retain all signed entries. Only do a
2046 // more aggressive rollback, potentially including signatures, when
2047 // receiving a conflicting AppendEntries
2049
2050 state->leadership_state = ccf::kv::LeadershipState::Follower;
2052 "Becoming follower {}: {}.{}",
2053 state->node_id,
2054 state->current_view,
2055 state->commit_idx);
2056
2057#ifdef CCF_RAFT_TRACING
2058 nlohmann::json j = {};
2059 j["function"] = "become_follower";
2060 j["state"] = *state;
2061 COMMITTABLE_INDICES(j["state"], state);
2062 j["configurations"] = configurations;
2064#endif
2065 }
2066
2067 // Called when a replica becomes aware of the existence of a new term
2068 // If retired already, state remains unchanged, but the replica otherwise
2069 // becomes a follower in the new term.
2071 {
2072 RAFT_DEBUG_FMT("Becoming aware of new term {}", term);
2073
2074 state->current_view = term;
2075 voted_for.reset();
2076 reset_votes_for_me();
2078 is_new_follower = true;
2079 }
2080
2081 private:
2082 void become_retired(Index idx, ccf::kv::RetirementPhase phase)
2083 {
2085 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2086 phase,
2087 state->leadership_state,
2088 state->node_id,
2089 state->current_view,
2090 idx);
2091
2093 {
2095 !state->retirement_idx.has_value(),
2096 "retirement_idx already set to {}",
2097 state->retirement_idx.value());
2098 state->retirement_idx = idx;
2099 RAFT_INFO_FMT("Node retiring at {}", idx);
2100 }
2101 else if (phase == ccf::kv::RetirementPhase::Signed)
2102 {
2103 assert(state->retirement_idx.has_value());
2105 idx >= state->retirement_idx.value(),
2106 "Index {} unexpectedly lower than retirement_idx {}",
2107 idx,
2108 state->retirement_idx.value());
2109 state->retirement_committable_idx = idx;
2110 RAFT_INFO_FMT("Node retirement committable at {}", idx);
2111 }
2113 {
2114 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2115 {
2116 ProposeRequestVote prv{.term = state->current_view};
2117
2118 std::optional<ccf::NodeId> successor = std::nullopt;
2119 Index max_match_idx = 0;
2120 ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;
2121
2122 // Pick the node that has the highest match_idx, and break
2123 // ties by looking at the highest reconfiguration id they are
2124 // part of. This can lead to nudging a node that is
2125 // about to retire too, but that node will then nudge
2126 // a successor, and that seems preferable to nudging a node that
2127 // risks not being eligible if reconfiguration id is prioritised.
2128 // Alternatively, we could pick the node with the higest match idx
2129 // in the latest config, provided that match idx at least as high as a
2130 // majority. That would make them both eligible and unlikely to retire
2131 // soon.
2132 for (auto& [node, node_state] : all_other_nodes)
2133 {
2134 if (node_state.match_idx >= max_match_idx)
2135 {
2136 ccf::kv::ReconfigurationId latest_reconf_id = 0;
2137 auto conf = configurations.rbegin();
2138 while (conf != configurations.rend())
2139 {
2140 if (conf->nodes.find(node) != conf->nodes.end())
2141 {
2142 latest_reconf_id = conf->idx;
2143 break;
2144 }
2145 conf++;
2146 }
2147 if (!(node_state.match_idx == max_match_idx &&
2148 latest_reconf_id < reconf_id_of_max_match))
2149 {
2150 reconf_id_of_max_match = latest_reconf_id;
2151 successor = node;
2152 max_match_idx = node_state.match_idx;
2153 }
2154 }
2155 }
2156 if (successor.has_value())
2157 {
2158 RAFT_INFO_FMT("Node retired, nudging {}", successor.value());
2159 channels->send_authenticated(
2160 successor.value(), ccf::NodeMsgType::consensus_msg, prv);
2161 }
2162 }
2163
2164 leader_id.reset();
2165 state->leadership_state = ccf::kv::LeadershipState::None;
2166 }
2167
2168 state->membership_state = ccf::kv::MembershipState::Retired;
2169 state->retirement_phase = phase;
2170 }
2171
2172 void add_vote_for_me(const ccf::NodeId& from)
2173 {
2174 if (configurations.empty())
2175 {
2177 "Not voting for myself {} due to lack of a configuration.",
2178 state->node_id);
2179 return;
2180 }
2181
2182 // Add vote for from node in each configuration where it is present
2183 for (auto const& conf : configurations)
2184 {
2185 auto const& nodes = conf.nodes;
2186 if (nodes.find(from) == nodes.end())
2187 {
2188 // from node is no longer in any active configuration.
2189 continue;
2190 }
2191
2192 votes_for_me[conf.idx].votes.insert(from);
2194 "Node {} voted for {} in configuration {} with quorum {}",
2195 from,
2196 state->node_id,
2197 conf.idx,
2198 votes_for_me[conf.idx].quorum);
2199 }
2200
2201 // We need a quorum of votes in _all_ configurations to become leader
2202 bool is_elected = true;
2203 for (auto const& v : votes_for_me)
2204 {
2205 auto const& quorum = v.second.quorum;
2206 auto const& votes = v.second.votes;
2207
2208 if (votes.size() < quorum)
2209 {
2210 is_elected = false;
2211 break;
2212 }
2213 }
2214
2215 if (is_elected)
2216 {
2217 become_leader();
2218 }
2219 }
2220
2221 // If there exists some committable idx in the current term such that idx >
2222 // commit_idx and a majority of nodes have replicated it, commit to that
2223 // idx.
2224 void update_commit()
2225 {
2226 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2227 {
2228 throw std::logic_error(
2229 "update_commit() must only be called while this node is leader");
2230 }
2231
2232 std::optional<Index> new_agreement_index = std::nullopt;
2233
2234 // Obtain CFT watermarks
2235 for (auto const& c : configurations)
2236 {
2237 // The majority must be checked separately for each active
2238 // configuration.
2239 std::vector<Index> match;
2240 match.reserve(c.nodes.size());
2241
2242 for (auto node : c.nodes)
2243 {
2244 if (node.first == state->node_id)
2245 {
2246 match.push_back(state->last_idx);
2247 }
2248 else
2249 {
2250 match.push_back(all_other_nodes.at(node.first).match_idx);
2251 }
2252 }
2253
2254 sort(match.begin(), match.end());
2255 auto confirmed = match.at((match.size() - 1) / 2);
2256
2257 if (
2258 !new_agreement_index.has_value() ||
2259 confirmed < new_agreement_index.value())
2260 {
2261 new_agreement_index = confirmed;
2262 }
2263 }
2264
2265 if (new_agreement_index.has_value())
2266 {
2267 if (new_agreement_index.value() > state->last_idx)
2268 {
2269 throw std::logic_error(
2270 "Followers appear to have later match indices than leader");
2271 }
2272
2273 const auto new_commit_idx =
2274 find_highest_possible_committable_index(new_agreement_index.value());
2275
2276 if (new_commit_idx.has_value())
2277 {
2279 "In update_commit, new_commit_idx: {}, "
2280 "last_idx: {}",
2281 new_commit_idx.value(),
2282 state->last_idx);
2283
2284 const auto term_of_new = get_term_internal(new_commit_idx.value());
2285 if (term_of_new == state->current_view)
2286 {
2287 commit(new_commit_idx.value());
2288 }
2289 else
2290 {
2292 "Ack quorum at {} resulted in proposed commit index {}, which "
2293 "is in term {}. Waiting for agreement on committable entry in "
2294 "current term {} to update commit",
2295 new_agreement_index.value(),
2296 new_commit_idx.value(),
2297 term_of_new,
2298 state->current_view);
2299 }
2300 }
2301 }
2302 }
2303
2304 // Commits at the highest committable index which is not greater than the
2305 // given idx.
2306 void commit_if_possible(Index idx)
2307 {
2309 "Commit if possible {} (ci: {}) (ti {})",
2310 idx,
2311 state->commit_idx,
2312 get_term_internal(idx));
2313 if (
2314 (idx > state->commit_idx) &&
2315 (get_term_internal(idx) <= state->current_view))
2316 {
2317 const auto highest_committable =
2319 if (highest_committable.has_value())
2320 {
2321 commit(highest_committable.value());
2322 }
2323 }
2324 }
2325
2326 size_t get_quorum(size_t n) const
2327 {
2328 return (n / 2) + 1;
2329 }
2330
2331 void commit(Index idx)
2332 {
2333 if (idx > state->last_idx)
2334 {
2335 throw std::logic_error(fmt::format(
2336 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2337 }
2338
2339 RAFT_DEBUG_FMT("Starting commit");
2340
2341 // This could happen if a follower becomes the leader when it
2342 // has committed fewer log entries, although it has them available.
2343 if (idx <= state->commit_idx)
2344 return;
2345
2346#ifdef CCF_RAFT_TRACING
2347 nlohmann::json j = {};
2348 j["function"] = "commit";
2349 j["args"] = nlohmann::json::object();
2350 j["args"]["idx"] = idx;
2351 j["state"] = *state;
2352 COMMITTABLE_INDICES(j["state"], state);
2353 j["configurations"] = configurations;
2355#endif
2356
2358
2359 state->commit_idx = idx;
2360 if (
2361 is_retired() &&
2362 state->retirement_phase == ccf::kv::RetirementPhase::Signed &&
2363 state->retirement_committable_idx.has_value() &&
2364 idx >= state->retirement_committable_idx.value())
2365 {
2366 become_retired(idx, ccf::kv::RetirementPhase::Completed);
2367 }
2368
2369 RAFT_DEBUG_FMT("Compacting...");
2370 store->compact(idx);
2371 ledger->commit(idx);
2372
2373 RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx);
2374
2375 // Examine each configuration that is followed by a globally committed
2376 // configuration.
2377 bool changed = false;
2378
2379 while (true)
2380 {
2381 auto conf = configurations.begin();
2382 if (conf == configurations.end())
2383 {
2384 break;
2385 }
2386
2387 auto next = std::next(conf);
2388 if (next == configurations.end())
2389 {
2390 break;
2391 }
2392
2393 if (idx < next->idx)
2394 {
2395 break;
2396 }
2397
2399 "Configurations: discard committed configuration at {}", conf->idx);
2400 configurations.pop_front();
2401 changed = true;
2402 }
2403
2404 if (changed)
2405 {
2406 create_and_remove_node_state();
2407 if (retired_node_cleanup && is_primary())
2408 {
2409 retired_node_cleanup->cleanup();
2410 }
2411 }
2412 }
2413
2414 bool is_self_in_latest_config()
2415 {
2416 bool present = false;
2417 if (!configurations.empty())
2418 {
2419 auto current_nodes = configurations.back().nodes;
2420 present = current_nodes.find(state->node_id) != current_nodes.end();
2421 }
2422 return present;
2423 }
2424
2425 void start_ticking_if_necessary()
2426 {
2427 if (!ticking && is_self_in_latest_config())
2428 {
2429 start_ticking();
2430 }
2431 }
2432
2433 public:
2434 void rollback(Index idx)
2435 {
2436 if (idx < state->commit_idx)
2437 {
2439 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2440 "ignoring rollback request",
2441 idx,
2442 state->commit_idx);
2443 return;
2444 }
2445
2446 store->rollback({get_term_internal(idx), idx}, state->current_view);
2447
2448 RAFT_DEBUG_FMT("Setting term in store to: {}", state->current_view);
2449 ledger->truncate(idx);
2450 state->last_idx = idx;
2451 RAFT_DEBUG_FMT("Rolled back at {}", idx);
2452
2453 state->view_history.rollback(idx);
2454
2455 while (!state->committable_indices.empty() &&
2456 (state->committable_indices.back() > idx))
2457 {
2458 state->committable_indices.pop_back();
2459 }
2460
2461 if (
2462 state->membership_state == ccf::kv::MembershipState::Retired &&
2463 state->retirement_phase == ccf::kv::RetirementPhase::Signed)
2464 {
2465 assert(state->retirement_committable_idx.has_value());
2466 if (state->retirement_committable_idx.value() > idx)
2467 {
2468 state->retirement_committable_idx = std::nullopt;
2469 state->retirement_phase = ccf::kv::RetirementPhase::Ordered;
2470 }
2471 }
2472
2473 if (
2474 state->membership_state == ccf::kv::MembershipState::Retired &&
2475 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
2476 {
2477 assert(state->retirement_idx.has_value());
2478 if (state->retirement_idx.value() > idx)
2479 {
2480 state->retirement_idx = std::nullopt;
2481 state->retirement_phase = std::nullopt;
2482 state->membership_state = ccf::kv::MembershipState::Active;
2483 RAFT_DEBUG_FMT("Becoming Active after rollback");
2484 }
2485 }
2486
2487 // Rollback configurations.
2488 bool changed = false;
2489
2490 while (!configurations.empty() && (configurations.back().idx > idx))
2491 {
2493 "Configurations: rollback configuration at {}",
2494 configurations.back().idx);
2495 configurations.pop_back();
2496 changed = true;
2497 }
2498
2499 if (changed)
2500 {
2501 create_and_remove_node_state();
2502 }
2503 }
2504
2505 nlohmann::json get_state_representation() const
2506 {
2507 return *state;
2508 }
2509
2510 private:
2511 void create_and_remove_node_state()
2512 {
2513 // Find all nodes present in any active configuration.
2514 Configuration::Nodes active_nodes;
2515
2516 for (auto const& conf : configurations)
2517 {
2518 for (auto const& node : conf.nodes)
2519 {
2520 active_nodes.emplace(node.first, node.second);
2521 }
2522 }
2523
2524 // Remove all nodes in the node state that are not present in any active
2525 // configuration.
2526 std::vector<ccf::NodeId> to_remove;
2527
2528 for (const auto& node : all_other_nodes)
2529 {
2530 if (active_nodes.find(node.first) == active_nodes.end())
2531 {
2532 to_remove.push_back(node.first);
2533 }
2534 }
2535
2536 // Add all active nodes that are not already present in the node state.
2537 for (auto node_info : active_nodes)
2538 {
2539 if (node_info.first == state->node_id)
2540 {
2541 continue;
2542 }
2543
2544 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2545 {
2546 if (!channels->have_channel(node_info.first))
2547 {
2549 "Configurations: create node channel with {}", node_info.first);
2550
2551 channels->associate_node_address(
2552 node_info.first,
2553 node_info.second.hostname,
2554 node_info.second.port);
2555 }
2556
2557 // A new node is sent only future entries initially. If it does not
2558 // have prior data, it will communicate that back to the leader.
2559 auto index = state->last_idx + 1;
2560 all_other_nodes.try_emplace(
2561 node_info.first, node_info.second, index, 0);
2562
2563 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2564 {
2565 send_append_entries(node_info.first, index);
2566 }
2567
2569 "Added raft node {} ({}:{})",
2570 node_info.first,
2571 node_info.second.hostname,
2572 node_info.second.port);
2573 }
2574 }
2575 }
2576 };
2577}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition raft.h:96
void add_configuration(Index idx, const ccf::kv::Configuration::Nodes &conf, const std::unordered_set< ccf::NodeId > &new_learner_nodes={}, const std::unordered_set< ccf::NodeId > &new_retired_nodes={}) override
Definition raft.h:532
Term get_view() override
Definition raft.h:478
ccf::NodeId id() override
Definition raft.h:254
std::shared_ptr< ccf::NodeToNode > channels
Definition raft.h:208
bool is_retired_completed() const
Definition raft.h:332
std::set< ccf::NodeId > other_nodes_in_active_configs() const
Definition raft.h:513
std::vector< Index > get_view_history_since(Index idx) override
Definition raft.h:503
bool can_replicate() override
Definition raft.h:269
bool replicate(const ccf::kv::BatchVector &entries, Term term) override
Definition raft.h:635
bool is_backup() override
Definition raft.h:311
nlohmann::json get_state_representation() const
Definition raft.h:2505
Aft(const ccf::consensus::Configuration &settings_, std::unique_ptr< Store > store_, std::unique_ptr< LedgerProxy > ledger_, std::shared_ptr< ccf::NodeToNode > channels_, std::shared_ptr< aft::State > state_, std::shared_ptr< ccf::NodeClient > rpc_request_context_, bool public_only_=false, ccf::kv::MembershipState initial_membership_state_=ccf::kv::MembershipState::Active, ccf::ReconfigurationType reconfiguration_type_=ccf::ReconfigurationType::ONE_TRANSACTION)
Definition raft.h:211
void become_follower()
Definition raft.h:2039
void compact_committable_indices(Index idx)
Definition raft.h:391
bool is_active() const
Definition raft.h:316
void enable_all_domains() override
Definition raft.h:400
std::optional< Index > find_highest_possible_committable_index(Index idx) const
Definition raft.h:375
void start_ticking()
Definition raft.h:576
std::vector< Index > get_view_history(Index idx) override
Definition raft.h:497
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size) override
Definition raft.h:747
void init_as_backup(Index index, Term term, const std::vector< Index > &term_history, Index recovery_start_index=0) override
Definition raft.h:447
Index last_committable_index() const
Definition raft.h:366
void rollback(Index idx)
Definition raft.h:2434
bool is_candidate() override
Definition raft.h:264
bool is_primary() override
Definition raft.h:259
std::pair< Term, Index > get_committed_txid() override
Definition raft.h:484
void become_aware_of_new_term(Term term)
Definition raft.h:2070
ccf::kv::ConsensusDetails get_details() override
Definition raft.h:609
Configuration::Nodes get_latest_configuration() override
Definition raft.h:603
bool is_retired_committed() const
Definition raft.h:326
Term get_view(Index idx) override
Definition raft.h:491
void force_become_primary() override
Definition raft.h:408
bool is_at_max_capacity() override
Definition raft.h:280
static constexpr size_t append_entries_size_limit
Definition raft.h:206
Consensus::SignatureDisposition get_signature_disposition() override
Definition raft.h:291
Index get_committed_seqno() override
Definition raft.h:472
bool is_retired() const
Definition raft.h:321
void force_become_primary(Index index, Term term, const std::vector< Index > &terms, Index commit_idx_) override
Definition raft.h:423
Index get_last_idx()
Definition raft.h:467
virtual ~Aft()=default
void set_retired_committed(ccf::SeqNo seqno, const std::vector< ccf::kv::NodeId > &node_ids) override
Definition raft.h:338
void periodic(std::chrono::milliseconds elapsed) override
Definition raft.h:824
void reset_last_ack_timeouts()
Definition raft.h:584
std::unique_ptr< LedgerProxy > ledger
Definition raft.h:207
std::optional< ccf::NodeId > primary() override
Definition raft.h:249
Configuration::Nodes get_latest_configuration_unsafe() const override
Definition raft.h:593
static constexpr ccf::View InvalidView
Definition state.h:25
Definition node_to_node.h:22
NodeId from
Definition node_to_node.h:24
Definition kv_types.h:436
Definition serialized.h:17
const char * what() const override
Definition serialized.h:24
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
Definition state.h:18
ccf::kv::Configuration Configuration
Definition raft.h:92
AppendEntriesResponseType
Definition raft_types.h:156
uint64_t Term
Definition raft_types.h:20
RaftMsgType
Definition raft_types.h:98
@ raft_append_entries_response
Definition raft_types.h:100
@ raft_request_vote
Definition raft_types.h:102
@ raft_request_vote_response
Definition raft_types.h:103
@ raft_append_entries
Definition raft_types.h:99
@ raft_propose_request_vote
Definition raft_types.h:104
uint64_t Index
Definition raft_types.h:19
RetirementPhase
Definition kv_types.h:166
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:245
uint64_t Version
Definition version.h:8
uint64_t ReconfigurationId
Definition kv_types.h:81
ApplyResult
Definition kv_types.h:341
@ PASS_SIGNATURE
Definition kv_types.h:343
@ FAIL
Definition kv_types.h:350
@ PASS
Definition kv_types.h:342
@ PASS_ENCRYPTED_PAST_LEDGER_SECRET
Definition kv_types.h:348
MembershipState
Definition kv_types.h:155
Definition app_interface.h:14
ReconfigurationType
Definition reconfiguration_type.h:10
@ ONE_TRANSACTION
Definition reconfiguration_type.h:11
constexpr View VIEW_UNKNOWN
Definition tx_id.h:26
@ consensus_msg
Definition node_types.h:21
uint64_t SeqNo
Definition tx_id.h:36
Definition dl_list.h:9
STL namespace.
#define LOG_ROLLBACK_INFO_FMT
Definition raft.h:88
#define RAFT_TRACE_FMT
Definition raft.h:57
#define RAFT_TRACE_JSON_OUT(json_object)
Definition raft.h:63
#define RAFT_FAIL_FMT
Definition raft.h:60
#define RAFT_INFO_FMT
Definition raft.h:59
#define RAFT_DEBUG_FMT
Definition raft.h:58
Definition raft_types.h:168
Definition raft_types.h:130
Definition raft_types.h:212
Definition raft_types.h:201
Definition raft_types.h:189
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition consensus_config.h:11
Definition kv_types.h:86
Definition kv_types.h:84
std::map< NodeId, NodeInfo > Nodes
Definition kv_types.h:103
Definition kv_types.h:184
bool ticking
Definition kv_types.h:201
std::optional< ccf::ReconfigurationType > reconfiguration_type
Definition kv_types.h:198
std::optional< RetirementPhase > retirement_phase
Definition kv_types.h:195
ccf::View current_view
Definition kv_types.h:200
std::optional< LeadershipState > leadership_state
Definition kv_types.h:194
MembershipState membership_state
Definition kv_types.h:193
std::unordered_map< ccf::NodeId, Ack > acks
Definition kv_types.h:192
std::optional< ccf::NodeId > primary_id
Definition kv_types.h:199
std::vector< Configuration > configs
Definition kv_types.h:191
Definition kv_types.h:52