CCF
Loading...
Searching...
No Matches
raft.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/ds/logger.h"
7#include "ccf/pal/locking.h"
8#include "ccf/tx_id.h"
9#include "ccf/tx_status.h"
10#include "ds/serialized.h"
12#include "impl/state.h"
13#include "kv/kv_types.h"
14#include "node/node_client.h"
15#include "node/node_to_node.h"
16#include "node/node_types.h"
18#include "raft_types.h"
20
21#include <algorithm>
22#include <list>
23#include <random>
24#include <unordered_map>
25#include <vector>
26
27#ifdef VERBOSE_RAFT_LOGGING
28# define RAFT_TRACE_FMT(s, ...) \
29 CCF_LOG_FMT(TRACE, "raft") \
30 ("{} | {} | {} | " s, \
31 state->node_id, \
32 state->leadership_state, \
33 state->membership_state, \
34 ##__VA_ARGS__)
35# define RAFT_DEBUG_FMT(s, ...) \
36 CCF_LOG_FMT(DEBUG, "raft") \
37 ("{} | {} | {} | " s, \
38 state->node_id, \
39 state->leadership_state, \
40 state->membership_state, \
41 ##__VA_ARGS__)
42# define RAFT_INFO_FMT(s, ...) \
43 CCF_LOG_FMT(INFO, "raft") \
44 ("{} | {} | {} | " s, \
45 state->node_id, \
46 state->leadership_state, \
47 state->membership_state, \
48 ##__VA_ARGS__)
49# define RAFT_FAIL_FMT(s, ...) \
50 CCF_LOG_FMT(FAIL, "raft") \
51 ("{} | {} | {} | " s, \
52 state->node_id, \
53 state->leadership_state, \
54 state->membership_state, \
55 ##__VA_ARGS__)
56#else
57# define RAFT_TRACE_FMT LOG_TRACE_FMT
58# define RAFT_DEBUG_FMT LOG_DEBUG_FMT
59# define RAFT_INFO_FMT LOG_INFO_FMT
60# define RAFT_FAIL_FMT LOG_FAIL_FMT
61#endif
62
63#define RAFT_TRACE_JSON_OUT(json_object) \
64 CCF_LOG_OUT(DEBUG, "raft_trace") << json_object
65
66#ifdef CCF_RAFT_TRACING
67
68static inline void add_committable_indices_start_and_end(
69 nlohmann::json& j, const std::shared_ptr<aft::State>& state)
70{
71 std::vector<aft::Index> committable_indices;
72 if (!state->committable_indices.empty())
73 {
74 committable_indices.push_back(state->committable_indices.front());
75 if (state->committable_indices.size() > 1)
76 {
77 committable_indices.push_back(state->committable_indices.back());
78 }
79 }
80 j["committable_indices"] = committable_indices;
81}
82
83# define COMMITTABLE_INDICES(event_state, state) \
84 add_committable_indices_start_and_end(event_state, state);
85
86#endif
87
88#define LOG_ROLLBACK_INFO_FMT CCF_LOG_FMT(INFO, "rollback")
89
90namespace aft
91{
93
94 template <class LedgerProxy>
95 class Aft : public ccf::kv::Consensus
96 {
97 private:
98 struct NodeState
99 {
100 Configuration::NodeInfo node_info;
101
102 // the highest index sent to the node
103 Index sent_idx;
104
105 // the highest matching index with the node that was confirmed
106 Index match_idx;
107
108 // timeout tracking the last time an ack was received from the node
109 std::chrono::milliseconds last_ack_timeout;
110
111 NodeState() = default;
112
113 NodeState(
114 const Configuration::NodeInfo& node_info_,
115 Index sent_idx_,
116 Index match_idx_ = 0) :
117 node_info(node_info_),
118 sent_idx(sent_idx_),
119 match_idx(match_idx_),
120 last_ack_timeout(0)
121 {}
122 };
123
124 // Persistent
125 std::unique_ptr<Store> store;
126
127 // Volatile
128 std::optional<ccf::NodeId> voted_for = std::nullopt;
129 std::optional<ccf::NodeId> leader_id = std::nullopt;
130
131 // Keep track of votes in each active configuration
132 struct Votes
133 {
134 std::unordered_set<ccf::NodeId> votes;
135 size_t quorum;
136 };
137 std::map<Index, Votes> votes_for_me;
138
139 std::chrono::milliseconds timeout_elapsed;
140
141 // When this node receives append entries from a new primary, it may need to
142 // roll back a committable but uncommitted suffix it holds. The
143 // new primary dictates the index where this suffix begins, which
144 // following the Raft election rules must be at least as high as the highest
145 // commit index reported by the previous primary. The window in which this
146 // rollback could be accepted is minimised to avoid unnecessary
147 // retransmissions - this node only executes this rollback instruction on
148 // the first append entries after it became a follower. As with any append
149 // entries, the initial index will not advance until this node acks.
150 bool is_new_follower = false;
151
152 // When this node becomes primary, they should produce a new signature in
153 // the current view. This signature is the first thing they may commit, as
154 // they cannot confirm commit of anything from a previous view (Raft paper
155 // section 5.4.2). This bool is true from the point this node becomes
156 // primary, until it sees a committable entry
157 bool should_sign = false;
158
159 std::shared_ptr<aft::State> state;
160
161 // Timeouts
162 std::chrono::milliseconds request_timeout;
163 std::chrono::milliseconds election_timeout;
164 size_t max_uncommitted_tx_count;
165 bool ticking = false;
166
167 // Configurations
168 std::list<Configuration> configurations;
169 // Union of other nodes (i.e. all nodes but us) in each active
170 // configuration, plus those that are retired, but for which
171 // the persistence of retirement knowledge is not yet established,
172 // i.e. Completed but not RetiredCommitted
173 // This should be used for diagnostic or broadcasting
174 // messages but _not_ for counting quorums, which should be done for each
175 // active configuration.
176 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
177 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
178 ReconfigurationType reconfiguration_type;
179
180 // Node client to trigger submission of RPC requests
181 std::shared_ptr<ccf::NodeClient> node_client;
182
183 // Used to remove retired nodes from store
184 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
185
186 size_t entry_size_not_limited = 0;
187 size_t entry_count = 0;
188 Index entries_batch_size = 20;
189 static constexpr int batch_window_size = 100;
190 int batch_window_sum = 0;
191
192 // When this is set, only public domain is deserialised when receiving
193 // append entries
194 bool public_only = false;
195
196 // Randomness
197 std::uniform_int_distribution<int> distrib;
198 std::default_random_engine rand;
199
200 // AppendEntries messages are currently constrained to only contain entries
201 // from a single term, so that the receiver can know the term of each entry
202 // pre-deserialisation, without an additional header.
203 static constexpr size_t max_terms_per_append_entries = 1;
204
205 public:
206 static constexpr size_t append_entries_size_limit = 20000;
207 std::unique_ptr<LedgerProxy> ledger;
208 std::shared_ptr<ccf::NodeToNode> channels;
209
210 public:
212 const ccf::consensus::Configuration& settings_,
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ = false,
219 ccf::kv::MembershipState initial_membership_state_ =
221 ReconfigurationType reconfiguration_type_ =
223 store(std::move(store_)),
224
225 timeout_elapsed(0),
226
227 state(state_),
228
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
232
233 reconfiguration_type(reconfiguration_type_),
234 node_client(rpc_request_context_),
235 retired_node_cleanup(
236 std::make_unique<ccf::RetiredNodeCleanup>(node_client)),
237
238 public_only(public_only_),
239
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
242
243 ledger(std::move(ledger_)),
244 channels(channels_)
245 {}
246
247 virtual ~Aft() = default;
248
249 std::optional<ccf::NodeId> primary() override
250 {
251 return leader_id;
252 }
253
254 ccf::NodeId id() override
255 {
256 return state->node_id;
257 }
258
259 bool is_primary() override
260 {
261 return state->leadership_state == ccf::kv::LeadershipState::Leader;
262 }
263
264 bool is_candidate() override
265 {
266 return state->leadership_state == ccf::kv::LeadershipState::Candidate;
267 }
268
269 bool can_replicate() override
270 {
271 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
272 return can_replicate_unsafe();
273 }
274
280 bool is_at_max_capacity() override
281 {
282 if (max_uncommitted_tx_count == 0)
283 {
284 return false;
285 }
286 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
287 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
288 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
289 }
290
291 Consensus::SignatureDisposition get_signature_disposition() override
292 {
293 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
294 if (can_sign_unsafe())
295 {
296 if (should_sign)
297 {
298 return Consensus::SignatureDisposition::SHOULD_SIGN;
299 }
300 else
301 {
302 return Consensus::SignatureDisposition::CAN_SIGN;
303 }
304 }
305 else
306 {
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
308 }
309 }
310
311 bool is_backup() override
312 {
313 return state->leadership_state == ccf::kv::LeadershipState::Follower;
314 }
315
316 bool is_active() const
317 {
318 return state->membership_state == ccf::kv::MembershipState::Active;
319 }
320
321 bool is_retired() const
322 {
323 return state->membership_state == ccf::kv::MembershipState::Retired;
324 }
325
327 {
328 return state->membership_state == ccf::kv::MembershipState::Retired &&
329 state->retirement_phase == ccf::kv::RetirementPhase::RetiredCommitted;
330 }
331
333 {
334 return state->membership_state == ccf::kv::MembershipState::Retired &&
335 state->retirement_phase == ccf::kv::RetirementPhase::Completed;
336 }
337
339 ccf::SeqNo seqno, const std::vector<ccf::kv::NodeId>& node_ids) override
340 {
341 for (auto& node_id : node_ids)
342 {
343 if (id() == node_id)
344 {
346 state->membership_state == ccf::kv::MembershipState::Retired,
347 "Node is not retired, cannot become retired committed");
349 state->retirement_phase == ccf::kv::RetirementPhase::Completed,
350 "Node is not retired, cannot become retired committed");
351 state->retired_committed_idx = seqno;
352 become_retired(seqno, ccf::kv::RetirementPhase::RetiredCommitted);
353 }
354 else
355 {
356 // Once a node's retired_committed status is itself committed, all
357 // future primaries in the network must be aware its retirement is
358 // committed, and so no longer need any communication with it to
359 // advance commit. No further communication with this node is needed.
360 all_other_nodes.erase(node_id);
361 RAFT_INFO_FMT("Removed {} from nodes known to consensus", node_id);
362 }
363 }
364 }
365
367 {
368 return state->committable_indices.empty() ?
369 state->commit_idx :
370 state->committable_indices.back();
371 }
372
373 // Returns the highest committable index which is not greater than the
374 // given idx.
376 Index idx) const
377 {
378 const auto it = std::upper_bound(
379 state->committable_indices.rbegin(),
380 state->committable_indices.rend(),
381 idx,
382 [](const auto& l, const auto& r) { return l >= r; });
383 if (it == state->committable_indices.rend())
384 {
385 return std::nullopt;
386 }
387
388 return *it;
389 }
390
392 {
393 while (!state->committable_indices.empty() &&
394 (state->committable_indices.front() <= idx))
395 {
396 state->committable_indices.pop_front();
397 }
398 }
399
400 void enable_all_domains() override
401 {
402 // When receiving append entries as a follower, all security domains will
403 // be deserialised
404 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
405 public_only = false;
406 }
407
408 void force_become_primary() override
409 {
410 // This is unsafe and should only be called when the node is certain
411 // there is no leader and no other node will attempt to force leadership.
412 if (leader_id.has_value())
413 {
414 throw std::logic_error(
415 "Can't force leadership if there is already a leader");
416 }
417
418 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
419 state->current_view += starting_view_change;
420 become_leader(true);
421 }
422
424 Index index,
425 Term term,
426 const std::vector<Index>& terms,
427 Index commit_idx_) override
428 {
429 // This is unsafe and should only be called when the node is certain
430 // there is no leader and no other node will attempt to force leadership.
431 if (leader_id.has_value())
432 {
433 throw std::logic_error(
434 "Can't force leadership if there is already a leader");
435 }
436
437 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
438 state->current_view = term;
439 state->last_idx = index;
440 state->commit_idx = commit_idx_;
441 state->view_history.initialise(terms);
442 state->view_history.update(index, term);
443 state->current_view += starting_view_change;
444 become_leader(true);
445 }
446
448 Index index,
449 Term term,
450 const std::vector<Index>& term_history,
451 Index recovery_start_index = 0) override
452 {
453 // This should only be called when the node resumes from a snapshot and
454 // before it has received any append entries.
455 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
456
457 state->last_idx = index;
458 state->commit_idx = index;
459
460 state->view_history.initialise(term_history);
461
462 ledger->init(index, recovery_start_index);
463
465 }
466
468 {
469 return state->last_idx;
470 }
471
473 {
474 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
475 return get_commit_idx_unsafe();
476 }
477
478 Term get_view() override
479 {
480 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
481 return state->current_view;
482 }
483
484 std::pair<Term, Index> get_committed_txid() override
485 {
486 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
487 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
488 return {get_term_internal(commit_idx), commit_idx};
489 }
490
491 Term get_view(Index idx) override
492 {
493 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
494 return get_term_internal(idx);
495 }
496
497 std::vector<Index> get_view_history(Index idx) override
498 {
499 // This should only be called when the spin lock is held.
500 return state->view_history.get_history_until(idx);
501 }
502
503 std::vector<Index> get_view_history_since(Index idx) override
504 {
505 // This should only be called when the spin lock is held.
506 return state->view_history.get_history_since(idx);
507 }
508
509 // Same as ccfraft.tla GetServerSet/IsInServerSet
510 // Not to be confused with all_other_nodes, which includes retired completed
511 // nodes. Used to restrict sending vote requests, and when becoming a
512 // leader, to decide whether to advance commit.
513 std::set<ccf::NodeId> other_nodes_in_active_configs() const
514 {
515 std::set<ccf::NodeId> nodes;
516
517 for (auto const& conf : configurations)
518 {
519 for (auto const& [node_id, _] : conf.nodes)
520 {
521 if (node_id != state->node_id)
522 {
523 nodes.insert(node_id);
524 }
525 }
526 }
527
528 return nodes;
529 }
530
531 public:
533 Index idx,
535 const std::unordered_set<ccf::NodeId>& new_learner_nodes = {},
536 const std::unordered_set<ccf::NodeId>& new_retired_nodes = {}) override
537 {
539 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
540
541 assert(new_learner_nodes.empty());
542
543#ifdef CCF_RAFT_TRACING
544 nlohmann::json j = {};
545 j["function"] = "add_configuration";
546 j["state"] = *state;
547 COMMITTABLE_INDICES(j["state"], state);
548 j["configurations"] = configurations;
549 j["args"] = nlohmann::json::object();
550 j["args"]["configuration"] = Configuration{idx, conf, idx};
552#endif
553
554 // Detect when we are retired by observing a configuration
555 // from which we are absent following a configuration in which
556 // we were included. Note that this relies on retirement being
557 // a final state, and node identities never being re-used.
558 if (
559 !configurations.empty() &&
560 configurations.back().nodes.find(state->node_id) !=
561 configurations.back().nodes.end() &&
562 conf.find(state->node_id) == conf.end())
563 {
564 become_retired(idx, ccf::kv::RetirementPhase::Ordered);
565 }
566
567 if (conf != configurations.back().nodes)
568 {
569 Configuration new_config = {idx, std::move(conf), idx};
570 configurations.push_back(new_config);
571
572 create_and_remove_node_state();
573 }
574 }
575
577 {
578 ticking = true;
579 using namespace std::chrono_literals;
580 timeout_elapsed = 0ms;
581 RAFT_INFO_FMT("Election timer has become active");
582 }
583
585 {
586 for (auto& node : all_other_nodes)
587 {
588 using namespace std::chrono_literals;
589 node.second.last_ack_timeout = 0ms;
590 }
591 }
592
594 {
595 if (configurations.empty())
596 {
597 return {};
598 }
599
600 return configurations.back().nodes;
601 }
602
604 {
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
607 }
608
610 {
612 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
613 details.primary_id = leader_id;
614 details.current_view = state->current_view;
615 details.ticking = ticking;
616 details.leadership_state = state->leadership_state;
617 details.membership_state = state->membership_state;
618 if (is_retired())
619 {
620 details.retirement_phase = state->retirement_phase;
621 }
622 for (auto const& conf : configurations)
623 {
624 details.configs.push_back(conf);
625 }
626 for (auto& [k, v] : all_other_nodes)
627 {
628 details.acks[k] = {
629 v.match_idx, static_cast<size_t>(v.last_ack_timeout.count())};
630 }
631 details.reconfiguration_type = reconfiguration_type;
632 return details;
633 }
634
635 bool replicate(const ccf::kv::BatchVector& entries, Term term) override
636 {
637 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
638
639 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
640 {
642 "Failed to replicate {} items: not leader", entries.size());
643 rollback(state->last_idx);
644 return false;
645 }
646
647 if (term != state->current_view)
648 {
650 "Failed to replicate {} items at term {}, current term is {}",
651 entries.size(),
652 term,
653 state->current_view);
654 return false;
655 }
656
658 {
660 "Failed to replicate {} items: node retirement is complete",
661 entries.size());
662 rollback(state->last_idx);
663 return false;
664 }
665
666 RAFT_DEBUG_FMT("Replicating {} entries", entries.size());
667
668 for (auto& [index, data, is_globally_committable, hooks] : entries)
669 {
670 bool globally_committable = is_globally_committable;
671
672 if (index != state->last_idx + 1)
673 return false;
674
676 "Replicated on leader {}: {}{} ({} hooks)",
677 state->node_id,
678 index,
679 (globally_committable ? " committable" : ""),
680 hooks->size());
681
682#ifdef CCF_RAFT_TRACING
683 nlohmann::json j = {};
684 j["function"] = "replicate";
685 j["state"] = *state;
686 COMMITTABLE_INDICES(j["state"], state);
687 j["view"] = term;
688 j["seqno"] = index;
689 j["globally_committable"] = globally_committable;
691#endif
692
693 for (auto& hook : *hooks)
694 {
695 hook->call(this);
696 }
697
698 if (globally_committable)
699 {
701 "membership: {} leadership: {}",
702 state->membership_state,
703 state->leadership_state);
704 if (
705 state->membership_state == ccf::kv::MembershipState::Retired &&
706 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
707 {
708 become_retired(index, ccf::kv::RetirementPhase::Signed);
709 }
710 state->committable_indices.push_back(index);
711 start_ticking_if_necessary();
712
713 // Reset should_sign here - whenever we see a committable entry we
714 // don't need to produce _another_ signature
715 should_sign = false;
716 }
717
718 state->last_idx = index;
719 ledger->put_entry(
720 *data, globally_committable, state->current_view, index);
721 entry_size_not_limited += data->size();
722 entry_count++;
723
724 state->view_history.update(index, state->current_view);
725 if (entry_size_not_limited >= append_entries_size_limit)
726 {
727 update_batch_size();
728 entry_count = 0;
729 entry_size_not_limited = 0;
730 for (const auto& it : all_other_nodes)
731 {
732 RAFT_DEBUG_FMT("Sending updates to follower {}", it.first);
733 send_append_entries(it.first, it.second.sent_idx + 1);
734 }
735 }
736 }
737
738 // Try to advance commit at once if there are no other nodes.
739 if (other_nodes_in_active_configs().size() == 0)
740 {
741 update_commit();
742 }
743
744 return true;
745 }
746
748 const ccf::NodeId& from, const uint8_t* data, size_t size) override
749 {
750 RaftMsgType type = serialized::peek<RaftMsgType>(data, size);
751
752 try
753 {
754 switch (type)
755 {
757 {
758 AppendEntries r =
759 channels->template recv_authenticated<AppendEntries>(
760 from, data, size);
761 recv_append_entries(from, r, data, size);
762 break;
763 }
764
766 {
768 channels->template recv_authenticated<AppendEntriesResponse>(
769 from, data, size);
770 recv_append_entries_response(from, r);
771 break;
772 }
773
775 {
776 RequestVote r = channels->template recv_authenticated<RequestVote>(
777 from, data, size);
778 recv_request_vote(from, r);
779 break;
780 }
781
783 {
785 channels->template recv_authenticated<RequestVoteResponse>(
786 from, data, size);
787 recv_request_vote_response(from, r);
788 break;
789 }
790
792 {
794 channels->template recv_authenticated<ProposeRequestVote>(
795 from, data, size);
796 recv_propose_request_vote(from, r);
797 break;
798 }
799
800 default:
801 {
802 RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
803 }
804 }
805 }
807 {
808 RAFT_INFO_FMT("Dropped invalid message from {}", e.from);
809 return;
810 }
812 {
813 RAFT_FAIL_FMT("Failed to parse message: {}", ise.what());
814 return;
815 }
816 catch (const std::exception& e)
817 {
818 LOG_FAIL_EXC(e.what());
819 return;
820 }
821 }
822
823 void periodic(std::chrono::milliseconds elapsed) override
824 {
825 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
826 timeout_elapsed += elapsed;
827
828 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
829 {
830 if (timeout_elapsed >= request_timeout)
831 {
832 using namespace std::chrono_literals;
833 timeout_elapsed = 0ms;
834
835 update_batch_size();
836 // Send newly available entries to all other nodes.
837 for (const auto& node : all_other_nodes)
838 {
839 send_append_entries(node.first, node.second.sent_idx + 1);
840 }
841 }
842
843 for (auto& node : all_other_nodes)
844 {
845 node.second.last_ack_timeout += elapsed;
846 }
847
848 bool has_quorum_of_backups = false;
849 for (auto const& conf : configurations)
850 {
851 size_t backup_ack_timeout_count = 0;
852 for (auto const& node : conf.nodes)
853 {
854 auto search = all_other_nodes.find(node.first);
855 if (search == all_other_nodes.end())
856 {
857 // Ignore ourselves as primary
858 continue;
859 }
860 if (search->second.last_ack_timeout >= election_timeout)
861 {
863 "No ack received from {} in last {}",
864 node.first,
865 election_timeout);
866 backup_ack_timeout_count++;
867 }
868 }
869
870 if (backup_ack_timeout_count < get_quorum(conf.nodes.size() - 1))
871 {
872 // If primary has quorum of active backups in _any_ configuration,
873 // it should remain primary
874 has_quorum_of_backups = true;
875 break;
876 }
877 }
878
879 if (!has_quorum_of_backups)
880 {
881 // CheckQuorum: The primary automatically steps down if there are no
882 // active configuration in which it has heard back from a majority of
883 // backups within an election timeout.
884 // Also see CheckQuorum action in tla/ccfraft.tla.
886 "Stepping down as leader {}: No ack received from a majority of "
887 "backups in last {}",
888 state->node_id,
889 election_timeout);
891 }
892 }
893 else
894 {
895 if (
896 !is_retired_committed() && ticking &&
897 timeout_elapsed >= election_timeout)
898 {
899 // Start an election.
900 become_candidate();
901 }
902 }
903 }
904
905 private:
906 Index find_highest_possible_match(const ccf::TxID& tx_id)
907 {
908 // Find the highest TxID this node thinks exists, which is still
909 // compatible with the given tx_id. That is, given T.n, find largest n'
910 // such that n' <= n && term_of(n') == T' && T' <= T. This may be T.n
911 // itself, if this node holds that index. Otherwise, examine the final
912 // entry in each term, counting backwards, until we find one which is
913 // still possible.
914 Index probe_index = std::min(tx_id.seqno, state->last_idx);
915 Term term_of_probe = state->view_history.view_at(probe_index);
916 while (term_of_probe > tx_id.view)
917 {
918 // Next possible match is the end of the previous term, which is
919 // 1-before the start of the currently considered term. Anything after
920 // that must have a term which is still too high.
921 probe_index = state->view_history.start_of_view(term_of_probe);
922 if (probe_index > 0)
923 {
924 --probe_index;
925 }
926 term_of_probe = state->view_history.view_at(probe_index);
927 }
928
930 "Looking for match with {}.{}, from {}.{}, best answer is {}",
931 tx_id.view,
932 tx_id.seqno,
933 state->view_history.view_at(state->last_idx),
934 state->last_idx,
935 probe_index);
936 return probe_index;
937 }
938
939 inline void update_batch_size()
940 {
941 auto avg_entry_size = (entry_count == 0) ?
943 entry_size_not_limited / entry_count;
944
945 auto batch_size = (avg_entry_size == 0) ?
947 append_entries_size_limit / avg_entry_size;
948
949 auto batch_avg = batch_window_sum / batch_window_size;
950 // balance out total batch size across batch window
951 batch_window_sum += (batch_size - batch_avg);
952 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
953 }
954
955 Term get_term_internal(Index idx)
956 {
957 if (idx > state->last_idx)
958 return ccf::VIEW_UNKNOWN;
959
960 return state->view_history.view_at(idx);
961 }
962
963 bool can_replicate_unsafe()
964 {
965 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
967 }
968
969 bool can_sign_unsafe()
970 {
971 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
973 }
974
975 Index get_commit_idx_unsafe()
976 {
977 return state->commit_idx;
978 }
979
980 void send_append_entries(const ccf::NodeId& to, Index start_idx)
981 {
983 "Sending append entries to node {} in batches of {}, covering the "
984 "range {} -> {}",
985 to,
986 entries_batch_size,
987 start_idx,
988 state->last_idx);
989
990 auto calculate_end_index = [this](Index start) {
991 // Cap the end index in 2 ways:
992 // - Must contain no more than entries_batch_size entries
993 // - Must contain entries from a single term
994 static_assert(
995 max_terms_per_append_entries == 1,
996 "AppendEntries construction logic enforces single term");
997 auto max_idx = state->last_idx;
998 const auto term_of_ae = state->view_history.view_at(start);
999 const auto index_at_end_of_term =
1000 state->view_history.end_of_view(term_of_ae);
1001 if (index_at_end_of_term != ccf::kv::NoVersion)
1002 {
1003 max_idx = index_at_end_of_term;
1004 }
1005 return std::min(start + entries_batch_size - 1, max_idx);
1006 };
1007
1008 Index end_idx;
1009
1010 // We break _after_ sending, so that in the case where this is called
1011 // with start==last, we send a single empty heartbeat
1012 do
1013 {
1014 end_idx = calculate_end_index(start_idx);
1015 RAFT_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
1016 send_append_entries_range(to, start_idx, end_idx);
1017 start_idx = std::min(end_idx + 1, state->last_idx);
1018 } while (end_idx != state->last_idx);
1019 }
1020
1021 void send_append_entries_range(
1022 const ccf::NodeId& to, Index start_idx, Index end_idx)
1023 {
1024 const auto prev_idx = start_idx - 1;
1025
1026 if (is_retired_committed() && start_idx >= end_idx)
1027 {
1028 // Continue to replicate, but do not send heartbeats if we know our
1029 // retirement is committed
1030 return;
1031 }
1032
1033 const auto prev_term = get_term_internal(prev_idx);
1034 const auto term_of_idx = get_term_internal(end_idx);
1035
1037 "Send append entries from {} to {}: ({}.{}, {}.{}] ({})",
1038 state->node_id,
1039 to,
1040 prev_term,
1041 prev_idx,
1042 term_of_idx,
1043 end_idx,
1044 state->commit_idx);
1045
1046#pragma clang diagnostic push
1047#pragma clang diagnostic ignored "-Wc99-designator"
1048 AppendEntries ae{
1049 {},
1050 {.idx = end_idx, .prev_idx = prev_idx},
1051 .term = state->current_view,
1052 .prev_term = prev_term,
1053 .leader_commit_idx = state->commit_idx,
1054 .term_of_idx = term_of_idx,
1055 };
1056#pragma clang diagnostic pop
1057
1058 auto& node = all_other_nodes.at(to);
1059
1060#ifdef CCF_RAFT_TRACING
1061 nlohmann::json j = {};
1062 j["function"] = "send_append_entries";
1063 j["packet"] = ae;
1064 j["state"] = *state;
1065 COMMITTABLE_INDICES(j["state"], state);
1066 j["to_node_id"] = to;
1067 j["match_idx"] = node.match_idx;
1068 j["sent_idx"] = node.sent_idx;
1070#endif
1071
1072 // The host will append log entries to this message when it is
1073 // sent to the destination node.
1074 if (!channels->send_authenticated(
1076 {
1077 return;
1078 }
1079
1080 // Record the most recent index we have sent to this node.
1081 node.sent_idx = end_idx;
1082 }
1083
1084 void recv_append_entries(
1085 const ccf::NodeId& from,
1086 AppendEntries r,
1087 const uint8_t* data,
1088 size_t size)
1089 {
1090 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1091
1093 "Received append entries: {}.{} to {}.{} (from {} in term {})",
1094 r.prev_term,
1095 r.prev_idx,
1096 r.term_of_idx,
1097 r.idx,
1098 from,
1099 r.term);
1100
1101#ifdef CCF_RAFT_TRACING
1102 nlohmann::json j = {};
1103 j["function"] = "recv_append_entries";
1104 j["packet"] = r;
1105 j["state"] = *state;
1106 COMMITTABLE_INDICES(j["state"], state);
1107 j["from_node_id"] = from;
1109#endif
1110
1111 // Don't check that the sender node ID is valid. Accept anything that
1112 // passes the integrity check. This way, entries containing dynamic
1113 // topology changes that include adding this new leader can be accepted.
1114
1115 // First, check append entries term against our own term, becoming
1116 // follower if necessary
1117 if (
1118 state->current_view == r.term &&
1119 state->leadership_state == ccf::kv::LeadershipState::Candidate)
1120 {
1122 }
1123 else if (state->current_view < r.term)
1124 {
1126 }
1127 else if (state->current_view > r.term)
1128 {
1129 // Reply false, since our term is later than the received term.
1131 "Recv append entries to {} from {} but our term is later ({} > {})",
1132 state->node_id,
1133 from,
1134 state->current_view,
1135 r.term);
1136 send_append_entries_response_nack(from);
1137 return;
1138 }
1139
1140 // Second, check term consistency with the entries we have so far
1141 const auto prev_term = get_term_internal(r.prev_idx);
1142 if (prev_term != r.prev_term)
1143 {
1145 "Previous term for {} should be {}", r.prev_idx, prev_term);
1146
1147 // Reply false if the log doesn't contain an entry at r.prev_idx
1148 // whose term is r.prev_term. Rejects "future" entries.
1149 if (prev_term == 0)
1150 {
1152 "Recv append entries to {} from {} but our log does not yet "
1153 "contain index {}",
1154 state->node_id,
1155 from,
1156 r.prev_idx);
1157 send_append_entries_response_nack(from);
1158 }
1159 else
1160 {
1162 "Recv append entries to {} from {} but our log at {} has the wrong "
1163 "previous term (ours: {}, theirs: {})",
1164 state->node_id,
1165 from,
1166 r.prev_idx,
1167 prev_term,
1168 r.prev_term);
1169 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1170 send_append_entries_response_nack(from, rejected_tx);
1171 }
1172 return;
1173 }
1174
1175 // If the terms match up, it is sufficient to convince us that the sender
1176 // is leader in our term
1177 restart_election_timeout();
1178 if (!leader_id.has_value() || leader_id.value() != from)
1179 {
1180 leader_id = from;
1182 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1183 }
1184
1185 // Third, check index consistency, making sure entries are not in the past
1186 if (r.prev_idx < state->commit_idx)
1187 {
1189 "Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
1190 "({})",
1191 state->node_id,
1192 from,
1193 r.prev_idx,
1194 state->commit_idx);
1195 return;
1196 }
1197 // Redundant with check on get_term_internal() at line 1149
1198 // Which captures this case in every situation, except r.prev_term == 0.
1199 // That only happens if r.prev_idx == 0 however, see line 1033,
1200 // in which case this path should not be taken either.
1201 else if (r.prev_idx > state->last_idx)
1202 {
1204 "Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
1205 state->node_id,
1206 from,
1207 r.prev_idx,
1208 state->last_idx);
1209 return;
1210 }
1211
1213 "Recv append entries to {} from {} for index {} and previous index {}",
1214 state->node_id,
1215 from,
1216 r.idx,
1217 r.prev_idx);
1218
1219 std::vector<std::tuple<
1220 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1222 append_entries;
1223 // Finally, deserialise each entry in the batch
1224 for (Index i = r.prev_idx + 1; i <= r.idx; i++)
1225 {
1226 if (i <= state->last_idx)
1227 {
1228 // NB: This is only safe as long as AppendEntries only contain a
1229 // single term. If they cover multiple terms, then we would need to at
1230 // least partially deserialise each entry to establish what term it is
1231 // in (or report the terms in the header)
1232 static_assert(
1233 max_terms_per_append_entries == 1,
1234 "AppendEntries rollback logic assumes single term");
1235 const auto incoming_term = r.term_of_idx;
1236 const auto local_term = state->view_history.view_at(i);
1237 if (incoming_term != local_term)
1238 {
1239 if (is_new_follower)
1240 {
1241 auto rollback_level = i - 1;
1243 "New follower received AppendEntries with conflict. Incoming "
1244 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1245 incoming_term,
1246 i,
1247 local_term,
1248 i,
1249 rollback_level);
1251 "Dropping conflicting branch. Rolling back {} entries, "
1252 "beginning with {}.{}.",
1253 state->last_idx - rollback_level,
1254 local_term,
1255 i);
1256 rollback(rollback_level);
1257 is_new_follower = false;
1258 // Then continue to process this AE as normal
1259 }
1260 else
1261 {
1262 // We have a node retaining a conflicting suffix, and refusing to
1263 // roll it back. It will remain divergent (not contributing to
1264 // commit) this term, and can only be brought in-sync in a future
1265 // term.
1266 // This log is emitted as a canary, for what we hope is an
1267 // unreachable branch. If it is ever seen we should revisit this.
1269 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1270 "beginning with {}.{}.",
1271 state->last_idx - (i - 1),
1272 local_term,
1273 i);
1274 return;
1275 }
1276 }
1277 else
1278 {
1279 // If the current entry has already been deserialised, skip the
1280 // payload for that entry
1281 ledger->skip_entry(data, size);
1282 continue;
1283 }
1284 }
1285
1286 std::vector<uint8_t> entry;
1287 try
1288 {
1289 entry = LedgerProxy::get_entry(data, size);
1290 }
1291 catch (const std::logic_error& e)
1292 {
1293 // This should only fail if there is malformed data.
1295 "Recv append entries to {} from {} but the data is malformed: {}",
1296 state->node_id,
1297 from,
1298 e.what());
1299 send_append_entries_response_nack(from);
1300 return;
1301 }
1302
1303 ccf::kv::TxID expected{r.term_of_idx, i};
1304 auto ds = store->deserialize(entry, public_only, expected);
1305 if (ds == nullptr)
1306 {
1308 "Recv append entries to {} from {} but the entry could not be "
1309 "deserialised",
1310 state->node_id,
1311 from);
1312 send_append_entries_response_nack(from);
1313 return;
1314 }
1315
1316 append_entries.push_back(std::make_tuple(std::move(ds), i));
1317 }
1318
1319 execute_append_entries_sync(
1320 std::move(append_entries), from, std::move(r));
1321 }
1322
1323 void execute_append_entries_sync(
1324 std::vector<std::tuple<
1325 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1326 ccf::kv::Version>>&& append_entries,
1327 const ccf::NodeId& from,
1328 AppendEntries&& r)
1329 {
1330 for (auto& ae : append_entries)
1331 {
1332 auto& [ds, i] = ae;
1333 RAFT_DEBUG_FMT("Replicating on follower {}: {}", state->node_id, i);
1334
1335#ifdef CCF_RAFT_TRACING
1336 nlohmann::json j = {};
1337 j["function"] = "execute_append_entries_sync";
1338 j["state"] = *state;
1339 COMMITTABLE_INDICES(j["state"], state);
1340 j["from_node_id"] = from;
1342#endif
1343
1344 bool track_deletes_on_missing_keys = false;
1345 ccf::kv::ApplyResult apply_success =
1346 ds->apply(track_deletes_on_missing_keys);
1347 if (apply_success == ccf::kv::ApplyResult::FAIL)
1348 {
1349 ledger->truncate(i - 1);
1350 send_append_entries_response_nack(from);
1351 return;
1352 }
1353 state->last_idx = i;
1354
1355 for (auto& hook : ds->get_hooks())
1356 {
1357 hook->call(this);
1358 }
1359
1360 bool globally_committable =
1361 (apply_success == ccf::kv::ApplyResult::PASS_SIGNATURE);
1362 if (globally_committable)
1363 {
1364 start_ticking_if_necessary();
1365 }
1366
1367 const auto& entry = ds->get_entry();
1368
1369 ledger->put_entry(
1370 entry, globally_committable, ds->get_term(), ds->get_index());
1371
1372 switch (apply_success)
1373 {
1375 {
1376 RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
1377 state->last_idx--;
1378 ledger->truncate(state->last_idx);
1379 send_append_entries_response_nack(from);
1380 break;
1381 }
1382
1384 {
1385 RAFT_DEBUG_FMT("Deserialising signature at {}", i);
1386 if (
1387 state->membership_state == ccf::kv::MembershipState::Retired &&
1388 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
1389 {
1390 become_retired(i, ccf::kv::RetirementPhase::Signed);
1391 }
1392 state->committable_indices.push_back(i);
1393
1394 if (ds->get_term())
1395 {
1396 // A signature for sig_term tells us that all transactions from
1397 // the previous signature onwards (at least, if not further back)
1398 // happened in sig_term. We reflect this in the history.
1399 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1400 {
1401 state->view_history.update(1, r.term);
1402 }
1403 else
1404 {
1405 // NB: This is only safe as long as AppendEntries only contain a
1406 // single term. If they cover multiple terms, then we need to
1407 // know our previous signature locally.
1408 static_assert(
1409 max_terms_per_append_entries == 1,
1410 "AppendEntries processing for term updates assumes single "
1411 "term");
1412 state->view_history.update(r.prev_idx + 1, ds->get_term());
1413 }
1414
1415 commit_if_possible(r.leader_commit_idx);
1416 }
1417 break;
1418 }
1419
1421 {
1422 break;
1423 }
1424
1426 {
1427 break;
1428 }
1429
1430 default:
1431 {
1432 throw std::logic_error("Unknown ApplyResult value");
1433 }
1434 }
1435 }
1436
1437 execute_append_entries_finish(r, from);
1438 }
1439
1440 void execute_append_entries_finish(
1441 AppendEntries& r, const ccf::NodeId& from)
1442 {
1443 // After entries have been deserialised, try to commit the leader's
1444 // commit index and update our term history accordingly
1445 commit_if_possible(r.leader_commit_idx);
1446
1447 // The term may have changed, and we have not have seen a signature yet.
1448 auto lci = last_committable_index();
1449 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1450 {
1451 // If we don't yet have a term history, then this must be happening in
1452 // the current term. This can only happen before _any_ transactions have
1453 // occurred, when processing a heartbeat at index 0, which does not
1454 // happen in a real node (due to the genesis transaction executing
1455 // before ticks start), but may happen in tests.
1456 state->view_history.update(1, r.term);
1457 }
1458 else
1459 {
1460 // The end of this append entries (r.idx) was not a signature, but may
1461 // be in a new term. If it's a new term, this term started immediately
1462 // after the previous signature we saw (lci, last committable index).
1463 if (r.idx > lci)
1464 {
1465 state->view_history.update(lci + 1, r.term_of_idx);
1466 }
1467 }
1468
1469 send_append_entries_response_ack(from, r);
1470 }
1471
1472 void send_append_entries_response_ack(
1473 ccf::NodeId to, const AppendEntries& ae)
1474 {
1475 // If we get to here, we have applied up to r.idx in this AppendEntries.
1476 // We must only ACK this far, as we know nothing about the agreement of a
1477 // suffix we may still hold _after_ r.idx with the leader's log
1478 const auto response_idx = ae.idx;
1479 send_append_entries_response(
1480 to, AppendEntriesResponseType::OK, state->current_view, response_idx);
1481 }
1482
1483 void send_append_entries_response_nack(
1484 ccf::NodeId to, const ccf::TxID& rejected)
1485 {
1486 const auto response_idx = find_highest_possible_match(rejected);
1487 const auto response_term = get_term_internal(response_idx);
1488
1489 send_append_entries_response(
1490 to, AppendEntriesResponseType::FAIL, response_term, response_idx);
1491 }
1492
1493 void send_append_entries_response_nack(ccf::NodeId to)
1494 {
1495 send_append_entries_response(
1496 to,
1498 state->current_view,
1499 state->last_idx);
1500 }
1501
1502 void send_append_entries_response(
1503 ccf::NodeId to,
1505 aft::Term response_term,
1506 aft::Index response_idx)
1507 {
1509 "Send append entries response from {} to {} for index {}: {}",
1510 state->node_id,
1511 to,
1512 response_idx,
1513 (answer == AppendEntriesResponseType::OK ? "ACK" : "NACK"));
1514
1515 AppendEntriesResponse response{
1516 .term = response_term,
1517 .last_log_idx = response_idx,
1518 .success = answer,
1519 };
1520
1521#ifdef CCF_RAFT_TRACING
1522 nlohmann::json j = {};
1523 j["function"] = "send_append_entries_response";
1524 j["packet"] = response;
1525 j["state"] = *state;
1526 COMMITTABLE_INDICES(j["state"], state);
1527 j["to_node_id"] = to;
1529#endif
1530
1531 channels->send_authenticated(
1532 to, ccf::NodeMsgType::consensus_msg, response);
1533 }
1534
1535 void recv_append_entries_response(
1536 const ccf::NodeId& from, AppendEntriesResponse r)
1537 {
1538 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1539
1540 auto node = all_other_nodes.find(from);
1541 if (node == all_other_nodes.end())
1542 {
1543 // Ignore if we don't recognise the node.
1545 "Recv append entries response to {} from {}: unknown node",
1546 state->node_id,
1547 from);
1548 return;
1549 }
1550
1551#ifdef CCF_RAFT_TRACING
1552 nlohmann::json j = {};
1553 j["function"] = "recv_append_entries_response";
1554 j["packet"] = r;
1555 j["state"] = *state;
1556 COMMITTABLE_INDICES(j["state"], state);
1557 j["from_node_id"] = from;
1558 j["match_idx"] = node->second.match_idx;
1559 j["sent_idx"] = node->second.sent_idx;
1561#endif
1562
1563 // Ignore if we're not the leader.
1564 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
1565 {
1567 "Recv append entries response to {} from {}: no longer leader",
1568 state->node_id,
1569 from);
1570 return;
1571 }
1572
1573 using namespace std::chrono_literals;
1574 node->second.last_ack_timeout = 0ms;
1575
1576 if (state->current_view < r.term)
1577 {
1578 // We are behind, update our state.
1580 "Recv append entries response to {} from {}: more recent term ({} "
1581 "> {})",
1582 state->node_id,
1583 from,
1584 r.term,
1585 state->current_view);
1587 return;
1588 }
1589 else if (state->current_view != r.term)
1590 {
1591 // Stale response, discard if success.
1592 // Otherwise reset sent_idx and try again.
1593 // NB: In NACKs the term may be that of an estimated matching index
1594 // in the log, rather than the current term, so it is correct for it to
1595 // be older in this case.
1596 if (r.success == AppendEntriesResponseType::OK)
1597 {
1599 "Recv append entries response to {} from {}: stale term ({} != {})",
1600 state->node_id,
1601 from,
1602 r.term,
1603 state->current_view);
1604 return;
1605 }
1606 }
1607 else if (r.last_log_idx < node->second.match_idx)
1608 {
1609 // Response about past indices, discard if success.
1610 // Otherwise reset sent_idx and try again.
1611 // NB: It is correct for this index to move backwards during NACKs
1612 // which iteratively discover the last matching index of divergent logs
1613 // after an election.
1614 if (r.success == AppendEntriesResponseType::OK)
1615 {
1617 "Recv append entries response to {} from {}: stale idx",
1618 state->node_id,
1619 from);
1620 return;
1621 }
1622 }
1623
1624 // Update next or match for the responding node.
1625 if (r.success == AppendEntriesResponseType::FAIL)
1626 {
1627 // Failed due to log inconsistency. Reset sent_idx, and try again soon.
1629 "Recv append entries response to {} from {}: failed",
1630 state->node_id,
1631 from);
1632 const auto this_match =
1633 find_highest_possible_match({r.term, r.last_log_idx});
1634 node->second.sent_idx = std::max(
1635 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1636 return;
1637 }
1638 else
1639 {
1640 // max(...) because why would we ever want to go backwards on a success
1641 // response?!
1642 node->second.match_idx =
1643 std::max(node->second.match_idx, r.last_log_idx);
1644 }
1645
1647 "Recv append entries response to {} from {} for index {}: success",
1648 state->node_id,
1649 from,
1650 r.last_log_idx);
1651 update_commit();
1652 }
1653
1654 void send_request_vote(const ccf::NodeId& to)
1655 {
1656 auto last_committable_idx = last_committable_index();
1658 "Send request vote from {} to {} at {}",
1659 state->node_id,
1660 to,
1661 last_committable_idx);
1662 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1663
1664 RequestVote rv{
1665 .term = state->current_view,
1666 .last_committable_idx = last_committable_idx,
1667 .term_of_last_committable_idx = get_term_internal(last_committable_idx),
1668 };
1669
1670#ifdef CCF_RAFT_TRACING
1671 nlohmann::json j = {};
1672 j["function"] = "send_request_vote";
1673 j["packet"] = rv;
1674 j["state"] = *state;
1675 COMMITTABLE_INDICES(j["state"], state);
1676 j["to_node_id"] = to;
1678#endif
1679
1680 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rv);
1681 }
1682
1683 void recv_request_vote(const ccf::NodeId& from, RequestVote r)
1684 {
1685 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1686
1687 // Do not check that from is a known node. It is possible to receive
1688 // RequestVotes from nodes that this node doesn't yet know, just as it
1689 // receives AppendEntries from those nodes. These should be obeyed just
1690 // like any other RequestVote - it is possible that this node is needed to
1691 // produce a primary in the new term, who will then help this node catch
1692 // up.
1693
1694#ifdef CCF_RAFT_TRACING
1695 nlohmann::json j = {};
1696 j["function"] = "recv_request_vote";
1697 j["packet"] = r;
1698 j["state"] = *state;
1699 COMMITTABLE_INDICES(j["state"], state);
1700 j["from_node_id"] = from;
1702#endif
1703
1704 if (state->current_view > r.term)
1705 {
1706 // Reply false, since our term is later than the received term.
1708 "Recv request vote to {} from {}: our term is later ({} > {})",
1709 state->node_id,
1710 from,
1711 state->current_view,
1712 r.term);
1713 send_request_vote_response(from, false);
1714 return;
1715 }
1716 else if (state->current_view < r.term)
1717 {
1719 "Recv request vote to {} from {}: their term is later ({} < {})",
1720 state->node_id,
1721 from,
1722 state->current_view,
1723 r.term);
1725 }
1726
1727 if (leader_id.has_value())
1728 {
1729 // Reply false, since we already know the leader in the current term.
1731 "Recv request vote to {} from {}: leader {} already known in term {}",
1732 state->node_id,
1733 from,
1734 leader_id.value(),
1735 state->current_view);
1736 send_request_vote_response(from, false);
1737 return;
1738 }
1739
1740 if ((voted_for.has_value()) && (voted_for.value() != from))
1741 {
1742 // Reply false, since we already voted for someone else.
1744 "Recv request vote to {} from {}: already voted for {}",
1745 state->node_id,
1746 from,
1747 voted_for.value());
1748 send_request_vote_response(from, false);
1749 return;
1750 }
1751
1752 // If the candidate's committable log is at least as up-to-date as ours,
1753 // vote yes
1754
1755 const auto last_committable_idx = last_committable_index();
1756 const auto term_of_last_committable_idx =
1757 get_term_internal(last_committable_idx);
1758
1759 const auto answer =
1760 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1761 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1762 (r.last_committable_idx >= last_committable_idx));
1763
1764 if (answer)
1765 {
1766 // If we grant our vote, we also acknowledge that an election is in
1767 // progress.
1768 restart_election_timeout();
1769 leader_id.reset();
1770 voted_for = from;
1771 }
1772 else
1773 {
1775 "Voting against candidate at {}.{} because local state is at {}.{}",
1776 r.term_of_last_committable_idx,
1777 r.last_committable_idx,
1778 term_of_last_committable_idx,
1779 last_committable_idx);
1780 }
1781
1782 send_request_vote_response(from, answer);
1783 }
1784
1785 void send_request_vote_response(const ccf::NodeId& to, bool answer)
1786 {
1788 "Send request vote response from {} to {}: {}",
1789 state->node_id,
1790 to,
1791 answer);
1792
1793 RequestVoteResponse response{
1794 .term = state->current_view, .vote_granted = answer};
1795
1796 channels->send_authenticated(
1797 to, ccf::NodeMsgType::consensus_msg, response);
1798 }
1799
1800 void recv_request_vote_response(
1801 const ccf::NodeId& from, RequestVoteResponse r)
1802 {
1803 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1804
1805#ifdef CCF_RAFT_TRACING
1806 nlohmann::json j = {};
1807 j["function"] = "recv_request_vote_response";
1808 j["packet"] = r;
1809 j["state"] = *state;
1810 COMMITTABLE_INDICES(j["state"], state);
1811 j["from_node_id"] = from;
1813#endif
1814
1815 if (state->leadership_state != ccf::kv::LeadershipState::Candidate)
1816 {
1818 "Recv request vote response to {} from: {}: we aren't a candidate",
1819 state->node_id,
1820 from);
1821 return;
1822 }
1823
1824 // Ignore if we don't recognise the node.
1825 auto node = all_other_nodes.find(from);
1826 if (node == all_other_nodes.end())
1827 {
1829 "Recv request vote response to {} from {}: unknown node",
1830 state->node_id,
1831 from);
1832 return;
1833 }
1834
1835 if (state->current_view < r.term)
1836 {
1838 "Recv request vote response to {} from {}: their term is more recent "
1839 "({} < {})",
1840 state->node_id,
1841 from,
1842 state->current_view,
1843 r.term);
1845 return;
1846 }
1847 else if (state->current_view != r.term)
1848 {
1849 // Ignore as it is stale.
1851 "Recv request vote response to {} from {}: stale ({} != {})",
1852 state->node_id,
1853 from,
1854 state->current_view,
1855 r.term);
1856 return;
1857 }
1858 else if (!r.vote_granted)
1859 {
1860 // Do nothing.
1862 "Recv request vote response to {} from {}: they voted no",
1863 state->node_id,
1864 from);
1865 return;
1866 }
1867
1869 "Recv request vote response to {} from {}: they voted yes",
1870 state->node_id,
1871 from);
1872
1873 add_vote_for_me(from);
1874 }
1875
1876 void recv_propose_request_vote(
1877 const ccf::NodeId& from, ProposeRequestVote r)
1878 {
1879 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1880
1881#ifdef CCF_RAFT_TRACING
1882 nlohmann::json j = {};
1883 j["function"] = "recv_propose_request_vote";
1884 j["packet"] = r;
1885 j["state"] = *state;
1886 COMMITTABLE_INDICES(j["state"], state);
1887 j["from_node_id"] = from;
1889#endif
1890 if (!is_retired_committed() && ticking && r.term == state->current_view)
1891 {
1893 "Becoming candidate early due to propose request vote from {}", from);
1894 become_candidate();
1895 }
1896 else
1897 {
1898 RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
1899 }
1900 }
1901
1902 void restart_election_timeout()
1903 {
1904 // Randomise timeout_elapsed to get a random election timeout
1905 // between 0.5x and 1x the configured election timeout.
1906 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
1907 }
1908
1909 void reset_votes_for_me()
1910 {
1911 votes_for_me.clear();
1912 for (auto const& conf : configurations)
1913 {
1914 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
1915 votes_for_me[conf.idx].votes.clear();
1916 }
1917 }
1918
1919 // ccfraft!Timeout
1920 void become_candidate()
1921 {
1922 if (configurations.empty())
1923 {
1924 // ccfraft!Timeout:
1925 // /\ \E c \in DOMAIN configurations[i] :
1926 // /\ i \in configurations[i][c]
1928 "Not becoming candidate {} due to lack of a configuration.",
1929 state->node_id);
1930 return;
1931 }
1932
1933 state->leadership_state = ccf::kv::LeadershipState::Candidate;
1934 leader_id.reset();
1935
1936 voted_for = state->node_id;
1937 reset_votes_for_me();
1938 state->current_view++;
1939
1940 restart_election_timeout();
1942
1944 "Becoming candidate {}: {}", state->node_id, state->current_view);
1945
1946#ifdef CCF_RAFT_TRACING
1947 nlohmann::json j = {};
1948 j["function"] = "become_candidate";
1949 j["state"] = *state;
1950 COMMITTABLE_INDICES(j["state"], state);
1951 j["configurations"] = configurations;
1953#endif
1954
1955 add_vote_for_me(state->node_id);
1956
1957 // Request votes only go to nodes in configurations, since only
1958 // their votes can be tallied towards an election quorum.
1959 for (auto const& node_id : other_nodes_in_active_configs())
1960 {
1961 // ccfraft!RequestVote
1962 send_request_vote(node_id);
1963 }
1964 }
1965
1966 void become_leader(bool force_become_leader = false)
1967 {
1969 {
1970 return;
1971 }
1972
1973 const auto election_index = last_committable_index();
1974
1976 "Election index is {} in term {}", election_index, state->current_view);
1977 // Discard any un-committable updates we may hold,
1978 // since we have no signature for them. Except at startup,
1979 // where we do not want to roll back the genesis transaction.
1980 if (state->commit_idx > 0)
1981 {
1982 rollback(election_index);
1983 }
1984 else
1985 {
1986 // but we still want the KV to know which term we're in
1987 store->initialise_term(state->current_view);
1988 }
1989
1990 state->leadership_state = ccf::kv::LeadershipState::Leader;
1991 leader_id = state->node_id;
1992 should_sign = true;
1993
1994 using namespace std::chrono_literals;
1995 timeout_elapsed = 0ms;
1996
1998
2000 "Becoming leader {}: {}", state->node_id, state->current_view);
2001
2002#ifdef CCF_RAFT_TRACING
2003 nlohmann::json j = {};
2004 j["function"] = "become_leader";
2005 j["state"] = *state;
2006 COMMITTABLE_INDICES(j["state"], state);
2007 j["configurations"] = configurations;
2009#endif
2010
2011 // Try to advance commit at once if there are no other nodes.
2012 if (other_nodes_in_active_configs().size() == 0)
2013 {
2014 update_commit();
2015 }
2016
2017 // Reset next, match, and sent indices for all nodes.
2018 auto next = state->last_idx + 1;
2019
2020 for (auto& node : all_other_nodes)
2021 {
2022 node.second.match_idx = 0;
2023 node.second.sent_idx = next - 1;
2024
2025 // Send an empty append_entries to all nodes.
2026 send_append_entries(node.first, next);
2027 }
2028
2029 if (retired_node_cleanup)
2030 {
2031 retired_node_cleanup->cleanup();
2032 }
2033 }
2034
2035 public:
2036 // Called when a replica becomes follower in the same term, e.g. when the
2037 // primary node has not received a majority of acks (CheckQuorum)
2039 {
2040 leader_id.reset();
2041 restart_election_timeout();
2043
2044 // Drop anything unsigned here, but retain all signed entries. Only do a
2045 // more aggressive rollback, potentially including signatures, when
2046 // receiving a conflicting AppendEntries
2048
2049 state->leadership_state = ccf::kv::LeadershipState::Follower;
2051 "Becoming follower {}: {}.{}",
2052 state->node_id,
2053 state->current_view,
2054 state->commit_idx);
2055
2056#ifdef CCF_RAFT_TRACING
2057 nlohmann::json j = {};
2058 j["function"] = "become_follower";
2059 j["state"] = *state;
2060 COMMITTABLE_INDICES(j["state"], state);
2061 j["configurations"] = configurations;
2063#endif
2064 }
2065
2066 // Called when a replica becomes aware of the existence of a new term
2067 // If retired already, state remains unchanged, but the replica otherwise
2068 // becomes a follower in the new term.
2070 {
2071 RAFT_DEBUG_FMT("Becoming aware of new term {}", term);
2072
2073 state->current_view = term;
2074 voted_for.reset();
2075 reset_votes_for_me();
2077 is_new_follower = true;
2078 }
2079
2080 private:
2081 void become_retired(Index idx, ccf::kv::RetirementPhase phase)
2082 {
2084 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2085 phase,
2086 state->leadership_state,
2087 state->node_id,
2088 state->current_view,
2089 idx);
2090
2092 {
2094 !state->retirement_idx.has_value(),
2095 "retirement_idx already set to {}",
2096 state->retirement_idx.value());
2097 state->retirement_idx = idx;
2098 RAFT_INFO_FMT("Node retiring at {}", idx);
2099 }
2100 else if (phase == ccf::kv::RetirementPhase::Signed)
2101 {
2102 assert(state->retirement_idx.has_value());
2104 idx >= state->retirement_idx.value(),
2105 "Index {} unexpectedly lower than retirement_idx {}",
2106 idx,
2107 state->retirement_idx.value());
2108 state->retirement_committable_idx = idx;
2109 RAFT_INFO_FMT("Node retirement committable at {}", idx);
2110 }
2112 {
2113 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2114 {
2115 ProposeRequestVote prv{.term = state->current_view};
2116
2117 std::optional<ccf::NodeId> successor = std::nullopt;
2118 Index max_match_idx = 0;
2119 ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;
2120
2121 // Pick the node that has the highest match_idx, and break
2122 // ties by looking at the highest reconfiguration id they are
2123 // part of. This can lead to nudging a node that is
2124 // about to retire too, but that node will then nudge
2125 // a successor, and that seems preferable to nudging a node that
2126 // risks not being eligible if reconfiguration id is prioritised.
2127 // Alternatively, we could pick the node with the higest match idx
2128 // in the latest config, provided that match idx at least as high as a
2129 // majority. That would make them both eligible and unlikely to retire
2130 // soon.
2131 for (auto& [node, node_state] : all_other_nodes)
2132 {
2133 if (node_state.match_idx >= max_match_idx)
2134 {
2135 ccf::kv::ReconfigurationId latest_reconf_id = 0;
2136 auto conf = configurations.rbegin();
2137 while (conf != configurations.rend())
2138 {
2139 if (conf->nodes.find(node) != conf->nodes.end())
2140 {
2141 latest_reconf_id = conf->idx;
2142 break;
2143 }
2144 conf++;
2145 }
2146 if (!(node_state.match_idx == max_match_idx &&
2147 latest_reconf_id < reconf_id_of_max_match))
2148 {
2149 reconf_id_of_max_match = latest_reconf_id;
2150 successor = node;
2151 max_match_idx = node_state.match_idx;
2152 }
2153 }
2154 }
2155 if (successor.has_value())
2156 {
2157 RAFT_INFO_FMT("Node retired, nudging {}", successor.value());
2158 channels->send_authenticated(
2159 successor.value(), ccf::NodeMsgType::consensus_msg, prv);
2160 }
2161 }
2162
2163 leader_id.reset();
2164 state->leadership_state = ccf::kv::LeadershipState::None;
2165 }
2166
2167 state->membership_state = ccf::kv::MembershipState::Retired;
2168 state->retirement_phase = phase;
2169 }
2170
2171 void add_vote_for_me(const ccf::NodeId& from)
2172 {
2173 if (configurations.empty())
2174 {
2176 "Not voting for myself {} due to lack of a configuration.",
2177 state->node_id);
2178 return;
2179 }
2180
2181 // Add vote for from node in each configuration where it is present
2182 for (auto const& conf : configurations)
2183 {
2184 auto const& nodes = conf.nodes;
2185 if (nodes.find(from) == nodes.end())
2186 {
2187 // from node is no longer in any active configuration.
2188 continue;
2189 }
2190
2191 votes_for_me[conf.idx].votes.insert(from);
2193 "Node {} voted for {} in configuration {} with quorum {}",
2194 from,
2195 state->node_id,
2196 conf.idx,
2197 votes_for_me[conf.idx].quorum);
2198 }
2199
2200 // We need a quorum of votes in _all_ configurations to become leader
2201 bool is_elected = true;
2202 for (auto const& v : votes_for_me)
2203 {
2204 auto const& quorum = v.second.quorum;
2205 auto const& votes = v.second.votes;
2206
2207 if (votes.size() < quorum)
2208 {
2209 is_elected = false;
2210 break;
2211 }
2212 }
2213
2214 if (is_elected)
2215 {
2216 become_leader();
2217 }
2218 }
2219
2220 // If there exists some committable idx in the current term such that idx >
2221 // commit_idx and a majority of nodes have replicated it, commit to that
2222 // idx.
2223 void update_commit()
2224 {
2225 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2226 {
2227 throw std::logic_error(
2228 "update_commit() must only be called while this node is leader");
2229 }
2230
2231 std::optional<Index> new_agreement_index = std::nullopt;
2232
2233 // Obtain CFT watermarks
2234 for (auto const& c : configurations)
2235 {
2236 // The majority must be checked separately for each active
2237 // configuration.
2238 std::vector<Index> match;
2239 match.reserve(c.nodes.size());
2240
2241 for (auto node : c.nodes)
2242 {
2243 if (node.first == state->node_id)
2244 {
2245 match.push_back(state->last_idx);
2246 }
2247 else
2248 {
2249 match.push_back(all_other_nodes.at(node.first).match_idx);
2250 }
2251 }
2252
2253 sort(match.begin(), match.end());
2254 auto confirmed = match.at((match.size() - 1) / 2);
2255
2256 if (
2257 !new_agreement_index.has_value() ||
2258 confirmed < new_agreement_index.value())
2259 {
2260 new_agreement_index = confirmed;
2261 }
2262 }
2263
2264 if (new_agreement_index.has_value())
2265 {
2266 if (new_agreement_index.value() > state->last_idx)
2267 {
2268 throw std::logic_error(
2269 "Followers appear to have later match indices than leader");
2270 }
2271
2272 const auto new_commit_idx =
2273 find_highest_possible_committable_index(new_agreement_index.value());
2274
2275 if (new_commit_idx.has_value())
2276 {
2278 "In update_commit, new_commit_idx: {}, "
2279 "last_idx: {}",
2280 new_commit_idx.value(),
2281 state->last_idx);
2282
2283 const auto term_of_new = get_term_internal(new_commit_idx.value());
2284 if (term_of_new == state->current_view)
2285 {
2286 commit(new_commit_idx.value());
2287 }
2288 else
2289 {
2291 "Ack quorum at {} resulted in proposed commit index {}, which "
2292 "is in term {}. Waiting for agreement on committable entry in "
2293 "current term {} to update commit",
2294 new_agreement_index.value(),
2295 new_commit_idx.value(),
2296 term_of_new,
2297 state->current_view);
2298 }
2299 }
2300 }
2301 }
2302
2303 // Commits at the highest committable index which is not greater than the
2304 // given idx.
2305 void commit_if_possible(Index idx)
2306 {
2308 "Commit if possible {} (ci: {}) (ti {})",
2309 idx,
2310 state->commit_idx,
2311 get_term_internal(idx));
2312 if (
2313 (idx > state->commit_idx) &&
2314 (get_term_internal(idx) <= state->current_view))
2315 {
2316 const auto highest_committable =
2318 if (highest_committable.has_value())
2319 {
2320 commit(highest_committable.value());
2321 }
2322 }
2323 }
2324
2325 size_t get_quorum(size_t n) const
2326 {
2327 return (n / 2) + 1;
2328 }
2329
2330 void commit(Index idx)
2331 {
2332 if (idx > state->last_idx)
2333 {
2334 throw std::logic_error(fmt::format(
2335 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2336 }
2337
2338 RAFT_DEBUG_FMT("Starting commit");
2339
2340 // This could happen if a follower becomes the leader when it
2341 // has committed fewer log entries, although it has them available.
2342 if (idx <= state->commit_idx)
2343 return;
2344
2345#ifdef CCF_RAFT_TRACING
2346 nlohmann::json j = {};
2347 j["function"] = "commit";
2348 j["args"] = nlohmann::json::object();
2349 j["args"]["idx"] = idx;
2350 j["state"] = *state;
2351 COMMITTABLE_INDICES(j["state"], state);
2352 j["configurations"] = configurations;
2354#endif
2355
2357
2358 state->commit_idx = idx;
2359 if (
2360 is_retired() &&
2361 state->retirement_phase == ccf::kv::RetirementPhase::Signed &&
2362 state->retirement_committable_idx.has_value() &&
2363 idx >= state->retirement_committable_idx.value())
2364 {
2365 become_retired(idx, ccf::kv::RetirementPhase::Completed);
2366 }
2367
2368 RAFT_DEBUG_FMT("Compacting...");
2369 store->compact(idx);
2370 ledger->commit(idx);
2371
2372 RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx);
2373
2374 // Examine each configuration that is followed by a globally committed
2375 // configuration.
2376 bool changed = false;
2377
2378 while (true)
2379 {
2380 auto conf = configurations.begin();
2381 if (conf == configurations.end())
2382 {
2383 break;
2384 }
2385
2386 auto next = std::next(conf);
2387 if (next == configurations.end())
2388 {
2389 break;
2390 }
2391
2392 if (idx < next->idx)
2393 {
2394 break;
2395 }
2396
2398 "Configurations: discard committed configuration at {}", conf->idx);
2399 configurations.pop_front();
2400 changed = true;
2401 }
2402
2403 if (changed)
2404 {
2405 create_and_remove_node_state();
2406 if (retired_node_cleanup && is_primary())
2407 {
2408 retired_node_cleanup->cleanup();
2409 }
2410 }
2411 }
2412
2413 bool is_self_in_latest_config()
2414 {
2415 bool present = false;
2416 if (!configurations.empty())
2417 {
2418 auto current_nodes = configurations.back().nodes;
2419 present = current_nodes.find(state->node_id) != current_nodes.end();
2420 }
2421 return present;
2422 }
2423
2424 void start_ticking_if_necessary()
2425 {
2426 if (!ticking && is_self_in_latest_config())
2427 {
2428 start_ticking();
2429 }
2430 }
2431
2432 public:
2433 void rollback(Index idx)
2434 {
2435 if (idx < state->commit_idx)
2436 {
2438 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2439 "ignoring rollback request",
2440 idx,
2441 state->commit_idx);
2442 return;
2443 }
2444
2445 store->rollback({get_term_internal(idx), idx}, state->current_view);
2446
2447 RAFT_DEBUG_FMT("Setting term in store to: {}", state->current_view);
2448 ledger->truncate(idx);
2449 state->last_idx = idx;
2450 RAFT_DEBUG_FMT("Rolled back at {}", idx);
2451
2452 state->view_history.rollback(idx);
2453
2454 while (!state->committable_indices.empty() &&
2455 (state->committable_indices.back() > idx))
2456 {
2457 state->committable_indices.pop_back();
2458 }
2459
2460 if (
2461 state->membership_state == ccf::kv::MembershipState::Retired &&
2462 state->retirement_phase == ccf::kv::RetirementPhase::Signed)
2463 {
2464 assert(state->retirement_committable_idx.has_value());
2465 if (state->retirement_committable_idx.value() > idx)
2466 {
2467 state->retirement_committable_idx = std::nullopt;
2468 state->retirement_phase = ccf::kv::RetirementPhase::Ordered;
2469 }
2470 }
2471
2472 if (
2473 state->membership_state == ccf::kv::MembershipState::Retired &&
2474 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
2475 {
2476 assert(state->retirement_idx.has_value());
2477 if (state->retirement_idx.value() > idx)
2478 {
2479 state->retirement_idx = std::nullopt;
2480 state->retirement_phase = std::nullopt;
2481 state->membership_state = ccf::kv::MembershipState::Active;
2482 RAFT_DEBUG_FMT("Becoming Active after rollback");
2483 }
2484 }
2485
2486 // Rollback configurations.
2487 bool changed = false;
2488
2489 while (!configurations.empty() && (configurations.back().idx > idx))
2490 {
2492 "Configurations: rollback configuration at {}",
2493 configurations.back().idx);
2494 configurations.pop_back();
2495 changed = true;
2496 }
2497
2498 if (changed)
2499 {
2500 create_and_remove_node_state();
2501 }
2502 }
2503
2504 nlohmann::json get_state_representation() const
2505 {
2506 return *state;
2507 }
2508
2509 private:
2510 void create_and_remove_node_state()
2511 {
2512 // Find all nodes present in any active configuration.
2513 Configuration::Nodes active_nodes;
2514
2515 for (auto const& conf : configurations)
2516 {
2517 for (auto const& node : conf.nodes)
2518 {
2519 active_nodes.emplace(node.first, node.second);
2520 }
2521 }
2522
2523 // Remove all nodes in the node state that are not present in any active
2524 // configuration.
2525 std::vector<ccf::NodeId> to_remove;
2526
2527 for (const auto& node : all_other_nodes)
2528 {
2529 if (active_nodes.find(node.first) == active_nodes.end())
2530 {
2531 to_remove.push_back(node.first);
2532 }
2533 }
2534
2535 // Add all active nodes that are not already present in the node state.
2536 for (auto node_info : active_nodes)
2537 {
2538 if (node_info.first == state->node_id)
2539 {
2540 continue;
2541 }
2542
2543 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2544 {
2545 if (!channels->have_channel(node_info.first))
2546 {
2548 "Configurations: create node channel with {}", node_info.first);
2549
2550 channels->associate_node_address(
2551 node_info.first,
2552 node_info.second.hostname,
2553 node_info.second.port);
2554 }
2555
2556 // A new node is sent only future entries initially. If it does not
2557 // have prior data, it will communicate that back to the leader.
2558 auto index = state->last_idx + 1;
2559 all_other_nodes.try_emplace(
2560 node_info.first, node_info.second, index, 0);
2561
2562 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2563 {
2564 send_append_entries(node_info.first, index);
2565 }
2566
2568 "Added raft node {} ({}:{})",
2569 node_info.first,
2570 node_info.second.hostname,
2571 node_info.second.port);
2572 }
2573 }
2574 }
2575 };
2576}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
#define LOG_FAIL_EXC(msg)
Definition ccf_exception.h:62
Definition raft.h:96
void add_configuration(Index idx, const ccf::kv::Configuration::Nodes &conf, const std::unordered_set< ccf::NodeId > &new_learner_nodes={}, const std::unordered_set< ccf::NodeId > &new_retired_nodes={}) override
Definition raft.h:532
Term get_view() override
Definition raft.h:478
ccf::NodeId id() override
Definition raft.h:254
std::shared_ptr< ccf::NodeToNode > channels
Definition raft.h:208
bool is_retired_completed() const
Definition raft.h:332
std::set< ccf::NodeId > other_nodes_in_active_configs() const
Definition raft.h:513
std::vector< Index > get_view_history_since(Index idx) override
Definition raft.h:503
bool can_replicate() override
Definition raft.h:269
bool replicate(const ccf::kv::BatchVector &entries, Term term) override
Definition raft.h:635
bool is_backup() override
Definition raft.h:311
nlohmann::json get_state_representation() const
Definition raft.h:2504
void become_follower()
Definition raft.h:2038
void compact_committable_indices(Index idx)
Definition raft.h:391
bool is_active() const
Definition raft.h:316
void enable_all_domains() override
Definition raft.h:400
std::optional< Index > find_highest_possible_committable_index(Index idx) const
Definition raft.h:375
void start_ticking()
Definition raft.h:576
std::vector< Index > get_view_history(Index idx) override
Definition raft.h:497
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size) override
Definition raft.h:747
void init_as_backup(Index index, Term term, const std::vector< Index > &term_history, Index recovery_start_index=0) override
Definition raft.h:447
Index last_committable_index() const
Definition raft.h:366
Aft(const ccf::consensus::Configuration &settings_, std::unique_ptr< Store > store_, std::unique_ptr< LedgerProxy > ledger_, std::shared_ptr< ccf::NodeToNode > channels_, std::shared_ptr< aft::State > state_, std::shared_ptr< ccf::NodeClient > rpc_request_context_, bool public_only_=false, ccf::kv::MembershipState initial_membership_state_=ccf::kv::MembershipState::Active, ReconfigurationType reconfiguration_type_=ReconfigurationType::ONE_TRANSACTION)
Definition raft.h:211
void rollback(Index idx)
Definition raft.h:2433
bool is_candidate() override
Definition raft.h:264
bool is_primary() override
Definition raft.h:259
std::pair< Term, Index > get_committed_txid() override
Definition raft.h:484
void become_aware_of_new_term(Term term)
Definition raft.h:2069
ccf::kv::ConsensusDetails get_details() override
Definition raft.h:609
Configuration::Nodes get_latest_configuration() override
Definition raft.h:603
bool is_retired_committed() const
Definition raft.h:326
Term get_view(Index idx) override
Definition raft.h:491
void force_become_primary() override
Definition raft.h:408
bool is_at_max_capacity() override
Definition raft.h:280
static constexpr size_t append_entries_size_limit
Definition raft.h:206
Consensus::SignatureDisposition get_signature_disposition() override
Definition raft.h:291
Index get_committed_seqno() override
Definition raft.h:472
bool is_retired() const
Definition raft.h:321
void force_become_primary(Index index, Term term, const std::vector< Index > &terms, Index commit_idx_) override
Definition raft.h:423
Index get_last_idx()
Definition raft.h:467
virtual ~Aft()=default
void set_retired_committed(ccf::SeqNo seqno, const std::vector< ccf::kv::NodeId > &node_ids) override
Definition raft.h:338
void periodic(std::chrono::milliseconds elapsed) override
Definition raft.h:823
void reset_last_ack_timeouts()
Definition raft.h:584
std::unique_ptr< LedgerProxy > ledger
Definition raft.h:207
std::optional< ccf::NodeId > primary() override
Definition raft.h:249
Configuration::Nodes get_latest_configuration_unsafe() const override
Definition raft.h:593
static constexpr ccf::View InvalidView
Definition state.h:25
Definition node_to_node.h:22
NodeId from
Definition node_to_node.h:24
Definition kv_types.h:430
Definition serialized.h:17
const char * what() const override
Definition serialized.h:24
ReconfigurationType
Definition reconfiguration_type.h:6
@ ONE_TRANSACTION
Definition reconfiguration_type.h:7
#define LOG_INFO_FMT
Definition logger.h:395
Definition state.h:18
ccf::kv::Configuration Configuration
Definition raft.h:92
AppendEntriesResponseType
Definition raft_types.h:156
uint64_t Term
Definition raft_types.h:20
RaftMsgType
Definition raft_types.h:98
@ raft_append_entries_response
Definition raft_types.h:100
@ raft_request_vote
Definition raft_types.h:102
@ raft_request_vote_response
Definition raft_types.h:103
@ raft_append_entries
Definition raft_types.h:99
@ raft_propose_request_vote
Definition raft_types.h:104
uint64_t Index
Definition raft_types.h:19
RetirementPhase
Definition kv_types.h:164
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:243
uint64_t Version
Definition version.h:8
uint64_t ReconfigurationId
Definition kv_types.h:79
ApplyResult
Definition kv_types.h:339
@ PASS_SIGNATURE
Definition kv_types.h:341
@ FAIL
Definition kv_types.h:348
@ PASS
Definition kv_types.h:340
@ PASS_ENCRYPTED_PAST_LEDGER_SECRET
Definition kv_types.h:346
MembershipState
Definition kv_types.h:153
Definition app_interface.h:15
constexpr View VIEW_UNKNOWN
Definition tx_id.h:26
@ consensus_msg
Definition node_types.h:21
uint64_t SeqNo
Definition tx_id.h:36
Definition dl_list.h:9
STL namespace.
#define LOG_ROLLBACK_INFO_FMT
Definition raft.h:88
#define RAFT_TRACE_FMT
Definition raft.h:57
#define RAFT_TRACE_JSON_OUT(json_object)
Definition raft.h:63
#define RAFT_FAIL_FMT
Definition raft.h:60
#define RAFT_INFO_FMT
Definition raft.h:59
#define RAFT_DEBUG_FMT
Definition raft.h:58
Definition raft_types.h:168
Definition raft_types.h:130
Definition raft_types.h:212
Definition raft_types.h:201
Definition raft_types.h:189
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition consensus_config.h:11
Definition kv_types.h:84
Definition kv_types.h:82
std::map< NodeId, NodeInfo > Nodes
Definition kv_types.h:101
Definition kv_types.h:182
bool ticking
Definition kv_types.h:199
std::optional< RetirementPhase > retirement_phase
Definition kv_types.h:193
ccf::View current_view
Definition kv_types.h:198
std::optional< LeadershipState > leadership_state
Definition kv_types.h:192
MembershipState membership_state
Definition kv_types.h:191
std::optional< ReconfigurationType > reconfiguration_type
Definition kv_types.h:196
std::unordered_map< ccf::NodeId, Ack > acks
Definition kv_types.h:190
std::optional< ccf::NodeId > primary_id
Definition kv_types.h:197
std::vector< Configuration > configs
Definition kv_types.h:189
Definition kv_types.h:50