CCF
Loading...
Searching...
No Matches
raft.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/locking.h"
7#include "ccf/tx_id.h"
8#include "ccf/tx_status.h"
10#include "ds/ccf_assert.h"
11#include "ds/internal_logger.h"
12#include "ds/serialized.h"
13#include "impl/state.h"
14#include "kv/kv_types.h"
16#include "node/node_client.h"
17#include "node/node_to_node.h"
18#include "node/node_types.h"
20#include "raft_types.h"
22
23#include <algorithm>
24#include <list>
25#include <random>
26#include <unordered_map>
27#include <vector>
28
29#ifdef VERBOSE_RAFT_LOGGING
30# define RAFT_TRACE_FMT(s, ...) \
31 CCF_LOG_FMT(TRACE, "raft") \
32 ("{} | {} | {} | " s, \
33 state->node_id, \
34 state->leadership_state, \
35 state->membership_state, \
36 ##__VA_ARGS__)
37# define RAFT_DEBUG_FMT(s, ...) \
38 CCF_LOG_FMT(DEBUG, "raft") \
39 ("{} | {} | {} | " s, \
40 state->node_id, \
41 state->leadership_state, \
42 state->membership_state, \
43 ##__VA_ARGS__)
44# define RAFT_INFO_FMT(s, ...) \
45 CCF_LOG_FMT(INFO, "raft") \
46 ("{} | {} | {} | " s, \
47 state->node_id, \
48 state->leadership_state, \
49 state->membership_state, \
50 ##__VA_ARGS__)
51# define RAFT_FAIL_FMT(s, ...) \
52 CCF_LOG_FMT(FAIL, "raft") \
53 ("{} | {} | {} | " s, \
54 state->node_id, \
55 state->leadership_state, \
56 state->membership_state, \
57 ##__VA_ARGS__)
58#else
59# define RAFT_TRACE_FMT LOG_TRACE_FMT
60# define RAFT_DEBUG_FMT LOG_DEBUG_FMT
61# define RAFT_INFO_FMT LOG_INFO_FMT
62# define RAFT_FAIL_FMT LOG_FAIL_FMT
63#endif
64
65#define RAFT_TRACE_JSON_OUT(json_object) \
66 CCF_LOG_OUT(DEBUG, "raft_trace") << json_object
67
68#ifdef CCF_RAFT_TRACING
69
70static inline void add_committable_indices_start_and_end(
71 nlohmann::json& j, const std::shared_ptr<aft::State>& state)
72{
73 std::vector<aft::Index> committable_indices;
74 if (!state->committable_indices.empty())
75 {
76 committable_indices.push_back(state->committable_indices.front());
77 if (state->committable_indices.size() > 1)
78 {
79 committable_indices.push_back(state->committable_indices.back());
80 }
81 }
82 j["committable_indices"] = committable_indices;
83}
84
85# define COMMITTABLE_INDICES(event_state, state) \
86 add_committable_indices_start_and_end(event_state, state);
87
88#endif
89
90#define LOG_ROLLBACK_INFO_FMT CCF_LOG_FMT(INFO, "rollback")
91
92namespace aft
93{
95
96 template <class LedgerProxy>
97 class Aft : public ccf::kv::Consensus
98 {
99 private:
100 struct NodeState
101 {
102 Configuration::NodeInfo node_info;
103
104 // the highest index sent to the node
105 Index sent_idx = 0;
106
107 // the highest matching index with the node that was confirmed
108 Index match_idx = 0;
109
110 // timeout tracking the last time an ack was received from the node
111 std::chrono::milliseconds last_ack_timeout{0};
112
113 NodeState() = default;
114
115 NodeState(
116 Configuration::NodeInfo node_info_,
117 Index sent_idx_,
118 Index match_idx_ = 0) :
119 node_info(std::move(node_info_)),
120 sent_idx(sent_idx_),
121 match_idx(match_idx_),
122 last_ack_timeout(0)
123 {}
124 };
125
126 // Persistent
127 std::unique_ptr<Store> store;
128
129 // Volatile
130 std::optional<ccf::NodeId> voted_for = std::nullopt;
131 std::optional<ccf::NodeId> leader_id = std::nullopt;
132
133 // Keep track of votes in each active configuration
134 struct Votes
135 {
136 std::unordered_set<ccf::NodeId> votes;
137 size_t quorum = 0;
138 };
139 std::map<Index, Votes> votes_for_me;
140
141 std::chrono::milliseconds timeout_elapsed;
142
143 // When this node receives append entries from a new primary, it may need to
144 // roll back a committable but uncommitted suffix it holds. The
145 // new primary dictates the index where this suffix begins, which
146 // following the Raft election rules must be at least as high as the highest
147 // commit index reported by the previous primary. The window in which this
148 // rollback could be accepted is minimised to avoid unnecessary
149 // retransmissions - this node only executes this rollback instruction on
150 // the first append entries after it became a follower. As with any append
151 // entries, the initial index will not advance until this node acks.
152 bool is_new_follower = false;
153
154 // When this node becomes primary, they should produce a new signature in
155 // the current view. This signature is the first thing they may commit, as
156 // they cannot confirm commit of anything from a previous view (Raft paper
157 // section 5.4.2). This bool is true from the point this node becomes
158 // primary, until it sees a committable entry
159 bool should_sign = false;
160
161 std::shared_ptr<aft::State> state;
162
163 // Timeouts
164 std::chrono::milliseconds request_timeout;
165 std::chrono::milliseconds election_timeout;
166 size_t max_uncommitted_tx_count;
167 bool ticking = false;
168
169 // Configurations
170 std::list<Configuration> configurations;
171 // Union of other nodes (i.e. all nodes but us) in each active
172 // configuration, plus those that are retired, but for which
173 // the persistence of retirement knowledge is not yet established,
174 // i.e. Completed but not RetiredCommitted
175 // This should be used for diagnostic or broadcasting
176 // messages but _not_ for counting quorums, which should be done for each
177 // active configuration.
178 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
179 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
180
181 // Node client to trigger submission of RPC requests
182 std::shared_ptr<ccf::NodeClient> node_client;
183
184 // Used to remove retired nodes from store
185 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
186
187 std::shared_ptr<ccf::CommitCallbackSubsystem> commit_callbacks;
188
189 size_t entry_size_not_limited = 0;
190 size_t entry_count = 0;
191 Index entries_batch_size = 20;
192 static constexpr int batch_window_size = 100;
193 int batch_window_sum = 0;
194
195 // When this is set, only public domain is deserialised when receiving
196 // append entries
197 bool public_only = false;
198
199 // Randomness
200 std::uniform_int_distribution<int> distrib;
201 std::default_random_engine rand;
202
203 // AppendEntries messages are currently constrained to only contain entries
204 // from a single term, so that the receiver can know the term of each entry
205 // pre-deserialisation, without an additional header.
206 static constexpr size_t max_terms_per_append_entries = 1;
207
208 public:
209 static constexpr size_t append_entries_size_limit = 20000;
210 std::unique_ptr<LedgerProxy> ledger;
211 std::shared_ptr<ccf::NodeToNode> channels;
212
214 const ccf::consensus::Configuration& settings_,
215 std::unique_ptr<Store> store_,
216 std::unique_ptr<LedgerProxy> ledger_,
217 std::shared_ptr<ccf::NodeToNode> channels_,
218 std::shared_ptr<aft::State> state_,
219 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
220 std::shared_ptr<ccf::CommitCallbackSubsystem>
221 commit_callbacks_subsystem_ = nullptr,
222 bool public_only_ = false) :
223 store(std::move(store_)),
224
225 timeout_elapsed(0),
226
227 state(std::move(state_)),
228
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
232
233 node_client(std::move(rpc_request_context_)),
234 retired_node_cleanup(
235 std::make_unique<ccf::RetiredNodeCleanup>(node_client)),
236 commit_callbacks(std::move(commit_callbacks_subsystem_)),
237
238 public_only(public_only_),
239
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
242
243 ledger(std::move(ledger_)),
244 channels(std::move(channels_))
245 {
246 if (commit_callbacks != nullptr)
247 {
248 commit_callbacks->set_consensus(this);
249 }
250 }
251
252 ~Aft() override = default;
253
254 std::optional<ccf::NodeId> primary() override
255 {
256 return leader_id;
257 }
258
259 ccf::NodeId id() override
260 {
261 return state->node_id;
262 }
263
264 bool is_primary() override
265 {
266 return state->leadership_state == ccf::kv::LeadershipState::Leader;
267 }
268
269 bool is_candidate() override
270 {
271 return state->leadership_state == ccf::kv::LeadershipState::Candidate;
272 }
273
274 bool can_replicate() override
275 {
276 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
277 return can_replicate_unsafe();
278 }
279
285 bool is_at_max_capacity() override
286 {
287 if (max_uncommitted_tx_count == 0)
288 {
289 return false;
290 }
291 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
292 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
293 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
294 }
295
296 Consensus::SignatureDisposition get_signature_disposition() override
297 {
298 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
299 if (can_sign_unsafe())
300 {
301 if (should_sign)
302 {
303 return Consensus::SignatureDisposition::SHOULD_SIGN;
304 }
305 return Consensus::SignatureDisposition::CAN_SIGN;
306 }
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
308 }
309
310 bool is_backup() override
311 {
312 return state->leadership_state == ccf::kv::LeadershipState::Follower;
313 }
314
315 bool is_active() const
316 {
317 return state->membership_state == ccf::kv::MembershipState::Active;
318 }
319
320 bool is_retired() const
321 {
322 return state->membership_state == ccf::kv::MembershipState::Retired;
323 }
324
326 {
327 return state->membership_state == ccf::kv::MembershipState::Retired &&
328 state->retirement_phase == ccf::kv::RetirementPhase::RetiredCommitted;
329 }
330
332 {
333 return state->membership_state == ccf::kv::MembershipState::Retired &&
334 state->retirement_phase == ccf::kv::RetirementPhase::Completed;
335 }
336
338 ccf::SeqNo seqno, const std::vector<ccf::kv::NodeId>& node_ids) override
339 {
340 for (const auto& node_id : node_ids)
341 {
342 if (id() == node_id)
343 {
345 state->membership_state == ccf::kv::MembershipState::Retired,
346 "Node is not retired, cannot become retired committed");
348 state->retirement_phase == ccf::kv::RetirementPhase::Completed,
349 "Node is not retired, cannot become retired committed");
350 state->retired_committed_idx = seqno;
351 become_retired(seqno, ccf::kv::RetirementPhase::RetiredCommitted);
352 }
353 else
354 {
355 // Once a node's retired_committed status is itself committed, all
356 // future primaries in the network must be aware its retirement is
357 // committed, and so no longer need any communication with it to
358 // advance commit. No further communication with this node is needed.
359 all_other_nodes.erase(node_id);
360 RAFT_INFO_FMT("Removed {} from nodes known to consensus", node_id);
361 }
362 }
363 }
364
366 {
367 return state->committable_indices.empty() ?
368 state->commit_idx :
369 state->committable_indices.back();
370 }
371
372 // Returns the highest committable index which is not greater than the
373 // given idx.
375 Index idx) const
376 {
377 const auto it = std::upper_bound(
378 state->committable_indices.rbegin(),
379 state->committable_indices.rend(),
380 idx,
381 [](const auto& l, const auto& r) { return l >= r; });
382 if (it == state->committable_indices.rend())
383 {
384 return std::nullopt;
385 }
386
387 return *it;
388 }
389
391 {
392 while (!state->committable_indices.empty() &&
393 (state->committable_indices.front() <= idx))
394 {
395 state->committable_indices.pop_front();
396 }
397 }
398
399 void enable_all_domains() override
400 {
401 // When receiving append entries as a follower, all security domains will
402 // be deserialised
403 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
404 public_only = false;
405 }
406
407 void force_become_primary() override
408 {
409 // This is unsafe and should only be called when the node is certain
410 // there is no leader and no other node will attempt to force leadership.
411 if (leader_id.has_value())
412 {
413 throw std::logic_error(
414 "Can't force leadership if there is already a leader");
415 }
416
417 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
418 state->current_view += starting_view_change;
419 become_leader(true);
420 }
421
423 Index index,
424 Term term,
425 const std::vector<Index>& terms,
426 Index commit_idx_) override
427 {
428 // This is unsafe and should only be called when the node is certain
429 // there is no leader and no other node will attempt to force leadership.
430 if (leader_id.has_value())
431 {
432 throw std::logic_error(
433 "Can't force leadership if there is already a leader");
434 }
435
436 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
437 state->current_view = term;
438 state->last_idx = index;
439 state->commit_idx = commit_idx_;
440 state->view_history.initialise(terms);
441 state->view_history.update(index, term);
442 state->current_view += starting_view_change;
443 become_leader(true);
444 }
445
447 Index index,
448 Term term,
449 const std::vector<Index>& term_history,
450 Index recovery_start_index = 0) override
451 {
452 // This should only be called when the node resumes from a snapshot and
453 // before it has received any append entries.
454 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
455
456 state->last_idx = index;
457 state->commit_idx = index;
458
459 state->view_history.initialise(term_history);
460
461 ledger->init(index, recovery_start_index);
462
464 }
465
467 {
468 return state->last_idx;
469 }
470
472 {
473 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
474 return get_commit_idx_unsafe();
475 }
476
477 Term get_view() override
478 {
479 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
480 return state->current_view;
481 }
482
483 std::pair<Term, Index> get_committed_txid() override
484 {
485 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
486 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
487 return {get_term_internal(commit_idx), commit_idx};
488 }
489
490 Term get_view(Index idx) override
491 {
492 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
493 return get_term_internal(idx);
494 }
495
496 std::vector<Index> get_view_history(Index idx) override
497 {
498 // This should only be called when the spin lock is held.
499 return state->view_history.get_history_until(idx);
500 }
501
502 std::vector<Index> get_view_history_since(Index idx) override
503 {
504 // This should only be called when the spin lock is held.
505 return state->view_history.get_history_since(idx);
506 }
507
508 // Same as ccfraft.tla GetServerSet/IsInServerSet
509 // Not to be confused with all_other_nodes, which includes retired completed
510 // nodes. Used to restrict sending vote requests, and when becoming a
511 // leader, to decide whether to advance commit.
512 std::set<ccf::NodeId> other_nodes_in_active_configs() const
513 {
514 std::set<ccf::NodeId> nodes;
515
516 for (auto const& conf : configurations)
517 {
518 for (auto const& [node_id, _] : conf.nodes)
519 {
520 if (node_id != state->node_id)
521 {
522 nodes.insert(node_id);
523 }
524 }
525 }
526
527 return nodes;
528 }
529
531 Index idx, const ccf::kv::Configuration::Nodes& conf) override
532 {
534 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
535
536#ifdef CCF_RAFT_TRACING
537 nlohmann::json j = {};
538 j["function"] = "add_configuration";
539 j["state"] = *state;
540 COMMITTABLE_INDICES(j["state"], state);
541 j["configurations"] = configurations;
542 j["args"] = nlohmann::json::object();
543 j["args"]["configuration"] = Configuration{idx, conf, idx};
545#endif
546
547 // Detect when we are retired by observing a configuration
548 // from which we are absent following a configuration in which
549 // we were included. Note that this relies on retirement being
550 // a final state, and node identities never being re-used.
551 if (
552 !configurations.empty() &&
553 configurations.back().nodes.find(state->node_id) !=
554 configurations.back().nodes.end() &&
555 conf.find(state->node_id) == conf.end())
556 {
557 become_retired(idx, ccf::kv::RetirementPhase::Ordered);
558 }
559
560 if (configurations.empty() || conf != configurations.back().nodes)
561 {
562 Configuration new_config = {idx, conf, idx};
563 configurations.push_back(new_config);
564
565 create_and_remove_node_state();
566 }
567 }
568
570 {
571 ticking = true;
572 using namespace std::chrono_literals;
573 timeout_elapsed = 0ms;
574 RAFT_INFO_FMT("Election timer has become active");
575 }
576
578 {
579 for (auto& node : all_other_nodes)
580 {
581 using namespace std::chrono_literals;
582 node.second.last_ack_timeout = 0ms;
583 }
584 }
585
587 {
588 if (configurations.empty())
589 {
590 return {};
591 }
592
593 return configurations.back().nodes;
594 }
595
597 {
598 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
600 }
601
603 {
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
606 details.primary_id = leader_id;
607 details.current_view = state->current_view;
608 details.ticking = ticking;
609 details.leadership_state = state->leadership_state;
610 details.membership_state = state->membership_state;
611 if (is_retired())
612 {
613 details.retirement_phase = state->retirement_phase;
614 }
615 for (auto const& conf : configurations)
616 {
617 details.configs.push_back(conf);
618 }
619 for (auto& [k, v] : all_other_nodes)
620 {
621 details.acks[k] = {
622 v.match_idx, static_cast<size_t>(v.last_ack_timeout.count())};
623 }
625 return details;
626 }
627
628 bool replicate(const ccf::kv::BatchVector& entries, Term term) override
629 {
630 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
631
632 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
633 {
635 "Failed to replicate {} items: not leader", entries.size());
636 rollback(state->last_idx);
637 return false;
638 }
639
640 if (term != state->current_view)
641 {
643 "Failed to replicate {} items at term {}, current term is {}",
644 entries.size(),
645 term,
646 state->current_view);
647 return false;
648 }
649
651 {
653 "Failed to replicate {} items: node retirement is complete",
654 entries.size());
655 rollback(state->last_idx);
656 return false;
657 }
658
659 RAFT_DEBUG_FMT("Replicating {} entries", entries.size());
660
661 for (const auto& [index, data, is_globally_committable, hooks] : entries)
662 {
663 bool globally_committable = is_globally_committable;
664
665 if (index != state->last_idx + 1)
666 {
667 return false;
668 }
669
671 "Replicated on leader {}: {}{} ({} hooks)",
672 state->node_id,
673 index,
674 (globally_committable ? " committable" : ""),
675 hooks->size());
676
677#ifdef CCF_RAFT_TRACING
678 nlohmann::json j = {};
679 j["function"] = "replicate";
680 j["state"] = *state;
681 COMMITTABLE_INDICES(j["state"], state);
682 j["view"] = term;
683 j["seqno"] = index;
684 j["globally_committable"] = globally_committable;
686#endif
687
688 for (auto& hook : *hooks)
689 {
690 hook->call(this);
691 }
692
693 if (globally_committable)
694 {
696 "membership: {} leadership: {}",
697 state->membership_state,
698 state->leadership_state);
699 if (
700 state->membership_state == ccf::kv::MembershipState::Retired &&
701 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
702 {
703 become_retired(index, ccf::kv::RetirementPhase::Signed);
704 }
705 state->committable_indices.push_back(index);
706 start_ticking_if_necessary();
707
708 // Reset should_sign here - whenever we see a committable entry we
709 // don't need to produce _another_ signature
710 should_sign = false;
711 }
712
713 state->last_idx = index;
714 ledger->put_entry(
715 *data, globally_committable, state->current_view, index);
716 entry_size_not_limited += data->size();
717 entry_count++;
718
719 state->view_history.update(index, state->current_view);
720 if (entry_size_not_limited >= append_entries_size_limit)
721 {
722 update_batch_size();
723 entry_count = 0;
724 entry_size_not_limited = 0;
725 for (const auto& it : all_other_nodes)
726 {
727 RAFT_DEBUG_FMT("Sending updates to follower {}", it.first);
728 send_append_entries(it.first, it.second.sent_idx + 1);
729 }
730 }
731 }
732
733 // Try to advance commit at once if there are no other nodes.
734 if (other_nodes_in_active_configs().size() == 0)
735 {
736 update_commit();
737 }
738
739 return true;
740 }
741
743 const ccf::NodeId& from, const uint8_t* data, size_t size) override
744 {
745 auto type = serialized::peek<RaftMsgType>(data, size);
746
747 try
748 {
749 switch (type)
750 {
752 {
753 AppendEntries r =
754 channels->template recv_authenticated<AppendEntries>(
755 from, data, size);
756 recv_append_entries(from, r, data, size);
757 break;
758 }
759
761 {
763 channels->template recv_authenticated<AppendEntriesResponse>(
764 from, data, size);
765 recv_append_entries_response(from, r);
766 break;
767 }
768
770 {
772 channels->template recv_authenticated<RequestPreVote>(
773 from, data, size);
774 recv_request_pre_vote(from, r);
775 break;
776 }
777
779 {
780 RequestVote r = channels->template recv_authenticated<RequestVote>(
781 from, data, size);
782 recv_request_vote(from, r);
783 break;
784 }
785
787 {
789 channels->template recv_authenticated<RequestPreVoteResponse>(
790 from, data, size);
791 recv_request_pre_vote_response(from, r);
792 break;
793 }
794
796 {
798 channels->template recv_authenticated<RequestVoteResponse>(
799 from, data, size);
800 recv_request_vote_response(from, r);
801 break;
802 }
803
805 {
807 channels->template recv_authenticated<ProposeRequestVote>(
808 from, data, size);
809 recv_propose_request_vote(from, r);
810 break;
811 }
812
814 default:
815 {
816 RAFT_FAIL_FMT("Received unhandled AFT message type: {}", type);
817 return;
818 }
819 }
820 }
822 {
823 RAFT_INFO_FMT("Dropped invalid message from {}", e.from);
824 return;
825 }
827 {
828 RAFT_FAIL_FMT("Failed to parse message: {}", ise.what());
829 return;
830 }
831 catch (const std::exception& e)
832 {
833 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
834 LOG_DEBUG_FMT("Error: {}", e.what());
835 return;
836 }
837 }
838
839 void periodic(std::chrono::milliseconds elapsed) override
840 {
841 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
842 timeout_elapsed += elapsed;
843
844 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
845 {
846 if (timeout_elapsed >= request_timeout)
847 {
848 using namespace std::chrono_literals;
849 timeout_elapsed = 0ms;
850
851 update_batch_size();
852 // Send newly available entries to all other nodes.
853 for (const auto& node : all_other_nodes)
854 {
855 send_append_entries(node.first, node.second.sent_idx + 1);
856 }
857 }
858
859 for (auto& node : all_other_nodes)
860 {
861 node.second.last_ack_timeout += elapsed;
862 }
863
864 bool every_active_config_has_a_quorum = std::all_of(
865 configurations.begin(),
866 configurations.end(),
867 [this](const Configuration& conf) {
868 size_t live_nodes_in_config = 0;
869 for (auto const& node : conf.nodes)
870 {
871 auto search = all_other_nodes.find(node.first);
872 if (
873 // if a (non-self) node is in a configuration, then it is in
874 // all_other_nodes. So if a node in a configuration is not found
875 // in all_other_nodes, it must be self, and hence is live
876 search == all_other_nodes.end() ||
877 // Otherwise we use the most recent ack as a failure probe
878 search->second.last_ack_timeout < election_timeout)
879 {
880 ++live_nodes_in_config;
881 }
882 else
883 {
884 RAFT_DEBUG_FMT(
885 "No ack received from {} in last {}",
886 node.first,
887 election_timeout);
888 }
889 }
890 return live_nodes_in_config >= get_quorum(conf.nodes.size());
891 });
892
893 if (!every_active_config_has_a_quorum)
894 {
895 // CheckQuorum: The primary automatically steps down if there are no
896 // active configuration in which it has heard back from a majority of
897 // backups within an election timeout.
898 // Also see CheckQuorum action in tla/ccfraft.tla.
900 "Stepping down as leader {}: No ack received from a majority of "
901 "backups in last {}",
902 state->node_id,
903 election_timeout);
905 }
906 }
907 else
908 {
909 if (
910 !is_retired_committed() && ticking &&
911 timeout_elapsed >= election_timeout)
912 {
913 // Start an election.
914 if (state->pre_vote_enabled)
915 {
916 become_pre_vote_candidate();
917 }
918 else
919 {
920 become_candidate();
921 }
922 }
923 }
924 }
925
926 private:
927 Index find_highest_possible_match(const ccf::TxID& tx_id)
928 {
929 // Find the highest TxID this node thinks exists, which is still
930 // compatible with the given tx_id. That is, given T.n, find largest n'
931 // such that n' <= n && term_of(n') == T' && T' <= T. This may be T.n
932 // itself, if this node holds that index. Otherwise, examine the final
933 // entry in each term, counting backwards, until we find one which is
934 // still possible.
935 Index probe_index = std::min(tx_id.seqno, state->last_idx);
936 Term term_of_probe = state->view_history.view_at(probe_index);
937 while (term_of_probe > tx_id.view)
938 {
939 // Next possible match is the end of the previous term, which is
940 // 1-before the start of the currently considered term. Anything after
941 // that must have a term which is still too high.
942 probe_index = state->view_history.start_of_view(term_of_probe);
943 if (probe_index > 0)
944 {
945 --probe_index;
946 }
947 term_of_probe = state->view_history.view_at(probe_index);
948 }
949
951 "Looking for match with {}.{}, from {}.{}, best answer is {}",
952 tx_id.view,
953 tx_id.seqno,
954 state->view_history.view_at(state->last_idx),
955 state->last_idx,
956 probe_index);
957 return probe_index;
958 }
959
960 void update_batch_size()
961 {
962 auto avg_entry_size = (entry_count == 0) ?
963 append_entries_size_limit :
964 entry_size_not_limited / entry_count;
965
966 auto batch_size = (avg_entry_size == 0) ?
967 append_entries_size_limit / 2 :
968 append_entries_size_limit / avg_entry_size;
969
970 auto batch_avg = batch_window_sum / batch_window_size;
971 // balance out total batch size across batch window
972 batch_window_sum += (batch_size - batch_avg);
973 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
974 }
975
976 Term get_term_internal(Index idx)
977 {
978 if (idx > state->last_idx)
979 {
980 return ccf::VIEW_UNKNOWN;
981 }
982
983 return state->view_history.view_at(idx);
984 }
985
986 bool can_replicate_unsafe()
987 {
988 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
989 !is_retired_committed();
990 }
991
992 bool can_sign_unsafe()
993 {
994 return state->leadership_state == ccf::kv::LeadershipState::Leader &&
995 !is_retired_committed();
996 }
997
998 Index get_commit_idx_unsafe()
999 {
1000 return state->commit_idx;
1001 }
1002
1003 void send_append_entries(const ccf::NodeId& to, Index start_idx)
1004 {
1006 "Sending append entries to node {} in batches of {}, covering the "
1007 "range {} -> {}",
1008 to,
1009 entries_batch_size,
1010 start_idx,
1011 state->last_idx);
1012
1013 auto calculate_end_index = [this](Index start) {
1014 // Cap the end index in 2 ways:
1015 // - Must contain no more than entries_batch_size entries
1016 // - Must contain entries from a single term
1017 static_assert(
1018 max_terms_per_append_entries == 1,
1019 "AppendEntries construction logic enforces single term");
1020 auto max_idx = state->last_idx;
1021 const auto term_of_ae = state->view_history.view_at(start);
1022 const auto index_at_end_of_term =
1023 state->view_history.end_of_view(term_of_ae);
1024 if (index_at_end_of_term != ccf::kv::NoVersion)
1025 {
1026 max_idx = index_at_end_of_term;
1027 }
1028 return std::min(start + entries_batch_size - 1, max_idx);
1029 };
1030
1031 Index end_idx = 0;
1032
1033 // We break _after_ sending, so that in the case where this is called
1034 // with start==last, we send a single empty heartbeat
1035 do
1036 {
1037 end_idx = calculate_end_index(start_idx);
1038 RAFT_TRACE_FMT("Sending sub range {} -> {}", start_idx, end_idx);
1039 send_append_entries_range(to, start_idx, end_idx);
1040 start_idx = std::min(end_idx + 1, state->last_idx);
1041 } while (end_idx != state->last_idx);
1042 }
1043
1044 void send_append_entries_range(
1045 const ccf::NodeId& to, Index start_idx, Index end_idx)
1046 {
1047 const auto prev_idx = start_idx - 1;
1048
1049 if (is_retired_committed() && start_idx >= end_idx)
1050 {
1051 // Continue to replicate, but do not send heartbeats if we know our
1052 // retirement is committed
1053 return;
1054 }
1055
1056 const auto prev_term = get_term_internal(prev_idx);
1057 const auto term_of_idx = get_term_internal(end_idx);
1058
1059#pragma clang diagnostic push
1060#pragma clang diagnostic ignored "-Wc99-designator"
1061 AppendEntries ae{
1062 {},
1063 {.idx = end_idx, .prev_idx = prev_idx},
1064 .term = state->current_view,
1065 .prev_term = prev_term,
1066 .leader_commit_idx = state->commit_idx,
1067 .term_of_idx = term_of_idx,
1068 };
1069#pragma clang diagnostic pop
1070
1072 "Send {} from {} to {}: ({}.{}, {}.{}] ({})",
1073 ae.msg,
1074 state->node_id,
1075 to,
1076 prev_term,
1077 prev_idx,
1078 term_of_idx,
1079 end_idx,
1080 state->commit_idx);
1081
1082 auto& node = all_other_nodes.at(to);
1083
1084#ifdef CCF_RAFT_TRACING
1085 nlohmann::json j = {};
1086 j["function"] = "send_append_entries";
1087 j["packet"] = ae;
1088 j["state"] = *state;
1089 COMMITTABLE_INDICES(j["state"], state);
1090 j["to_node_id"] = to;
1091 j["match_idx"] = node.match_idx;
1092 j["sent_idx"] = node.sent_idx;
1094#endif
1095
1096 // The host will append log entries to this message when it is
1097 // sent to the destination node.
1098 if (!channels->send_authenticated(
1100 {
1101 return;
1102 }
1103
1104 // Record the most recent index we have sent to this node.
1105 node.sent_idx = end_idx;
1106 }
1107
1108 void recv_append_entries(
1109 const ccf::NodeId& from,
1110 AppendEntries r,
1111 const uint8_t* data,
1112 size_t size)
1113 {
1114 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1115
1117 "Recv {} to {} from {}: {}.{} to {}.{} in term {}",
1118 r.msg,
1119 state->node_id,
1120 from,
1121 r.prev_term,
1122 r.prev_idx,
1123 r.term_of_idx,
1124 r.idx,
1125 r.term);
1126
1127#ifdef CCF_RAFT_TRACING
1128 nlohmann::json j = {};
1129 j["function"] = "recv_append_entries";
1130 j["packet"] = r;
1131 j["state"] = *state;
1132 COMMITTABLE_INDICES(j["state"], state);
1133 j["from_node_id"] = from;
1135#endif
1136
1137 // Don't check that the sender node ID is valid. Accept anything that
1138 // passes the integrity check. This way, entries containing dynamic
1139 // topology changes that include adding this new leader can be accepted.
1140
1141 // First, check append entries term against our own term, becoming
1142 // follower if necessary
1143 if (
1144 state->current_view == r.term &&
1145 (state->leadership_state == ccf::kv::LeadershipState::Candidate ||
1146 state->leadership_state == ccf::kv::LeadershipState::PreVoteCandidate))
1147 {
1148 become_aware_of_new_term(r.term);
1149 }
1150 else if (state->current_view < r.term)
1151 {
1152 become_aware_of_new_term(r.term);
1153 }
1154 else if (state->current_view > r.term)
1155 {
1156 // Reply false, since our term is later than the received term.
1158 "Recv {} to {} from {} but our term is later ({} > {})",
1159 r.msg,
1160 state->node_id,
1161 from,
1162 state->current_view,
1163 r.term);
1164 send_append_entries_response_nack(from);
1165 return;
1166 }
1167
1168 // Second, check term consistency with the entries we have so far
1169 const auto prev_term = get_term_internal(r.prev_idx);
1170 if (prev_term != r.prev_term)
1171 {
1173 "Previous term for {} should be {}", r.prev_idx, prev_term);
1174
1175 // Reply false if the log doesn't contain an entry at r.prev_idx
1176 // whose term is r.prev_term. Rejects "future" entries.
1177 if (prev_term == ccf::VIEW_UNKNOWN)
1178 {
1180 "Recv {} to {} from {} but our log does not yet "
1181 "contain index {}",
1182 r.msg,
1183 state->node_id,
1184 from,
1185 r.prev_idx);
1186 send_append_entries_response_nack(from);
1187 }
1188 else
1189 {
1191 "Recv {} to {} from {} but our log at {} has the wrong "
1192 "previous term (ours: {}, theirs: {})",
1193 r.msg,
1194 state->node_id,
1195 from,
1196 r.prev_idx,
1197 prev_term,
1198 r.prev_term);
1199 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1200 send_append_entries_response_nack(from, rejected_tx);
1201 }
1202 return;
1203 }
1204
1205 // If the terms match up, it is sufficient to convince us that the sender
1206 // is leader in our term
1207 restart_election_timeout();
1208 if (!leader_id.has_value() || leader_id.value() != from)
1209 {
1210 leader_id = from;
1212 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1213 }
1214
1215 // Third, check index consistency, making sure entries are not in the past
1216 if (r.prev_idx < state->commit_idx)
1217 {
1219 "Recv {} to {} from {} but prev_idx ({}) < commit_idx "
1220 "({})",
1221 r.msg,
1222 state->node_id,
1223 from,
1224 r.prev_idx,
1225 state->commit_idx);
1226 return;
1227 }
1228 // This block is redundant - the checks above cover this case, so the code
1229 // inside this block should be unreachable. It is retained out of
1230 // abundance of caution, in case future rewrites of the above conditions
1231 // allow a fallthrough.
1232 if (r.prev_idx > state->last_idx)
1233 {
1235 "Recv {} to {} from {} but prev_idx ({}) > last_idx ({})",
1236 r.msg,
1237 state->node_id,
1238 from,
1239 r.prev_idx,
1240 state->last_idx);
1241 return;
1242 }
1243
1245 "Recv {} to {} from {} for index {} and previous index {}",
1246 r.msg,
1247 state->node_id,
1248 from,
1249 r.idx,
1250 r.prev_idx);
1251
1252 std::vector<std::tuple<
1253 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1255 append_entries;
1256 // Finally, deserialise each entry in the batch
1257 for (Index i = r.prev_idx + 1; i <= r.idx; i++)
1258 {
1259 if (i <= state->last_idx)
1260 {
1261 // NB: This is only safe as long as AppendEntries only contain a
1262 // single term. If they cover multiple terms, then we would need to at
1263 // least partially deserialise each entry to establish what term it is
1264 // in (or report the terms in the header)
1265 static_assert(
1266 max_terms_per_append_entries == 1,
1267 "AppendEntries rollback logic assumes single term");
1268 const auto incoming_term = r.term_of_idx;
1269 const auto local_term = state->view_history.view_at(i);
1270 if (incoming_term != local_term)
1271 {
1272 if (is_new_follower)
1273 {
1274 auto rollback_level = i - 1;
1276 "New follower received AppendEntries with conflict. Incoming "
1277 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1278 incoming_term,
1279 i,
1280 local_term,
1281 i,
1282 rollback_level);
1284 "Dropping conflicting branch. Rolling back {} entries, "
1285 "beginning with {}.{}.",
1286 state->last_idx - rollback_level,
1287 local_term,
1288 i);
1289 rollback(rollback_level);
1290 is_new_follower = false;
1291 // Then continue to process this AE as normal
1292 }
1293 else
1294 {
1295 // We have a node retaining a conflicting suffix, and refusing to
1296 // roll it back. It will remain divergent (not contributing to
1297 // commit) this term, and can only be brought in-sync in a future
1298 // term.
1299 // This log is emitted as a canary, for what we hope is an
1300 // unreachable branch. If it is ever seen we should revisit this.
1302 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1303 "beginning with {}.{}.",
1304 state->last_idx - (i - 1),
1305 local_term,
1306 i);
1307 return;
1308 }
1309 }
1310 else
1311 {
1312 // If the current entry has already been deserialised, skip the
1313 // payload for that entry
1314 ledger->skip_entry(data, size);
1315 continue;
1316 }
1317 }
1318
1319 std::vector<uint8_t> entry;
1320 try
1321 {
1322 entry = LedgerProxy::get_entry(data, size);
1323 }
1324 catch (const std::logic_error& e)
1325 {
1326 // This should only fail if there is malformed data.
1328 "Recv {} to {} from {} but the data is malformed: {}",
1329 r.msg,
1330 state->node_id,
1331 from,
1332 e.what());
1333 send_append_entries_response_nack(from);
1334 return;
1335 }
1336
1337 ccf::TxID expected{r.term_of_idx, i};
1338 auto ds = store->deserialize(entry, public_only, expected);
1339 if (ds == nullptr)
1340 {
1342 "Recv {} to {} from {} but the entry could not be "
1343 "deserialised",
1344 r.msg,
1345 state->node_id,
1346 from);
1347 send_append_entries_response_nack(from);
1348 return;
1349 }
1350
1351 append_entries.emplace_back(std::move(ds), i);
1352 }
1353
1354 execute_append_entries_sync(std::move(append_entries), from, r);
1355 }
1356
1357 void execute_append_entries_sync(
1358 std::vector<std::tuple<
1359 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1360 ccf::kv::Version>>&& append_entries,
1361 const ccf::NodeId& from,
1362 const AppendEntries& r)
1363 {
1364 for (auto& ae : append_entries)
1365 {
1366 auto& [ds, i] = ae;
1367 RAFT_DEBUG_FMT("Replicating on follower {}: {}", state->node_id, i);
1368
1369#ifdef CCF_RAFT_TRACING
1370 nlohmann::json j = {};
1371 j["function"] = "execute_append_entries_sync";
1372 j["state"] = *state;
1373 COMMITTABLE_INDICES(j["state"], state);
1374 j["from_node_id"] = from;
1376#endif
1377
1378 bool track_deletes_on_missing_keys = false;
1379 ccf::kv::ApplyResult apply_success =
1380 ds->apply(track_deletes_on_missing_keys);
1381 if (apply_success == ccf::kv::ApplyResult::FAIL)
1382 {
1383 ledger->truncate(i - 1);
1384 send_append_entries_response_nack(from);
1385 return;
1386 }
1387 state->last_idx = i;
1388
1389 for (auto& hook : ds->get_hooks())
1390 {
1391 hook->call(this);
1392 }
1393
1394 bool globally_committable =
1395 (apply_success == ccf::kv::ApplyResult::PASS_SIGNATURE);
1396 if (globally_committable)
1397 {
1398 start_ticking_if_necessary();
1399 }
1400
1401 const auto& entry = ds->get_entry();
1402
1403 ledger->put_entry(
1404 entry, globally_committable, ds->get_term(), ds->get_index());
1405
1406 switch (apply_success)
1407 {
1409 {
1410 RAFT_FAIL_FMT("Follower failed to apply log entry: {}", i);
1411 state->last_idx--;
1412 ledger->truncate(state->last_idx);
1413 send_append_entries_response_nack(from);
1414 break;
1415 }
1416
1418 {
1419 RAFT_DEBUG_FMT("Deserialising signature at {}", i);
1420 if (
1421 state->membership_state == ccf::kv::MembershipState::Retired &&
1422 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
1423 {
1424 become_retired(i, ccf::kv::RetirementPhase::Signed);
1425 }
1426 state->committable_indices.push_back(i);
1427
1428 if (ds->get_term() != 0u)
1429 {
1430 // A signature for sig_term tells us that all transactions from
1431 // the previous signature onwards (at least, if not further back)
1432 // happened in sig_term. We reflect this in the history.
1433 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1434 {
1435 state->view_history.update(1, r.term);
1436 }
1437 else
1438 {
1439 // NB: This is only safe as long as AppendEntries only contain a
1440 // single term. If they cover multiple terms, then we need to
1441 // know our previous signature locally.
1442 static_assert(
1443 max_terms_per_append_entries == 1,
1444 "AppendEntries processing for term updates assumes single "
1445 "term");
1446 state->view_history.update(r.prev_idx + 1, ds->get_term());
1447 }
1448
1449 commit_if_possible(r.leader_commit_idx);
1450 }
1451 break;
1452 }
1453
1456 {
1457 break;
1458 }
1459
1465 {
1466 throw std::logic_error("Unknown ApplyResult value");
1467 }
1468 }
1469 }
1470
1471 execute_append_entries_finish(r, from);
1472 }
1473
1474 void execute_append_entries_finish(
1475 const AppendEntries& r, const ccf::NodeId& from)
1476 {
1477 // After entries have been deserialised, try to commit the leader's
1478 // commit index and update our term history accordingly
1479 commit_if_possible(r.leader_commit_idx);
1480
1481 // The term may have changed, and we have not have seen a signature yet.
1482 auto lci = last_committable_index();
1483 if (r.term_of_idx == aft::ViewHistory::InvalidView)
1484 {
1485 // If we don't yet have a term history, then this must be happening in
1486 // the current term. This can only happen before _any_ transactions have
1487 // occurred, when processing a heartbeat at index 0, which does not
1488 // happen in a real node (due to the genesis transaction executing
1489 // before ticks start), but may happen in tests.
1490 state->view_history.update(1, r.term);
1491 }
1492 else
1493 {
1494 // The end of this append entries (r.idx) was not a signature, but may
1495 // be in a new term. If it's a new term, this term started immediately
1496 // after the previous signature we saw (lci, last committable index).
1497 if (r.idx > lci)
1498 {
1499 state->view_history.update(lci + 1, r.term_of_idx);
1500 }
1501 }
1502
1503 send_append_entries_response_ack(from, r);
1504 }
1505
1506 void send_append_entries_response_ack(
1507 ccf::NodeId to, const AppendEntries& ae)
1508 {
1509 // If we get to here, we have applied up to r.idx in this AppendEntries.
1510 // We must only ACK this far, as we know nothing about the agreement of a
1511 // suffix we may still hold _after_ r.idx with the leader's log
1512 const auto response_idx = ae.idx;
1513 send_append_entries_response(
1514 to, AppendEntriesResponseType::OK, state->current_view, response_idx);
1515 }
1516
1517 void send_append_entries_response_nack(
1518 ccf::NodeId to, const ccf::TxID& rejected)
1519 {
1520 const auto response_idx = find_highest_possible_match(rejected);
1521 const auto response_term = get_term_internal(response_idx);
1522
1523 send_append_entries_response(
1524 to, AppendEntriesResponseType::FAIL, response_term, response_idx);
1525 }
1526
1527 void send_append_entries_response_nack(ccf::NodeId to)
1528 {
1529 send_append_entries_response(
1530 to,
1532 state->current_view,
1533 state->last_idx);
1534 }
1535
1536 void send_append_entries_response(
1537 ccf::NodeId to,
1539 aft::Term response_term,
1540 aft::Index response_idx)
1541 {
1542 AppendEntriesResponse response{
1543 .term = response_term,
1544 .last_log_idx = response_idx,
1545 .success = answer,
1546 };
1547
1549 "Send {} from {} to {} for index {}: {}",
1550 response.msg,
1551 state->node_id,
1552 to,
1553 response_idx,
1554 (answer == AppendEntriesResponseType::OK ? "ACK" : "NACK"));
1555
1556#ifdef CCF_RAFT_TRACING
1557 nlohmann::json j = {};
1558 j["function"] = "send_append_entries_response";
1559 j["packet"] = response;
1560 j["state"] = *state;
1561 COMMITTABLE_INDICES(j["state"], state);
1562 j["to_node_id"] = to;
1564#endif
1565
1566 channels->send_authenticated(
1567 to, ccf::NodeMsgType::consensus_msg, response);
1568 }
1569
1570 void recv_append_entries_response(
1571 const ccf::NodeId& from, AppendEntriesResponse r)
1572 {
1573 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1574
1575 auto node = all_other_nodes.find(from);
1576 if (node == all_other_nodes.end())
1577 {
1578 // Ignore if we don't recognise the node.
1580 "Recv append entries response to {} from {}: unknown node",
1581 state->node_id,
1582 from);
1583 return;
1584 }
1585
1586#ifdef CCF_RAFT_TRACING
1587 nlohmann::json j = {};
1588 j["function"] = "recv_append_entries_response";
1589 j["packet"] = r;
1590 j["state"] = *state;
1591 COMMITTABLE_INDICES(j["state"], state);
1592 j["from_node_id"] = from;
1593 j["match_idx"] = node->second.match_idx;
1594 j["sent_idx"] = node->second.sent_idx;
1596#endif
1597
1598 // Ignore if we're not the leader.
1599 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
1600 {
1602 "Recv {} to {} from {}: no longer leader",
1603 r.msg,
1604 state->node_id,
1605 from);
1606 return;
1607 }
1608
1609 using namespace std::chrono_literals;
1610 node->second.last_ack_timeout = 0ms;
1611
1612 if (state->current_view < r.term)
1613 {
1614 // We are behind, update our state.
1616 "Recv {} to {} from {}: more recent term ({} "
1617 "> {})",
1618 r.msg,
1619 state->node_id,
1620 from,
1621 r.term,
1622 state->current_view);
1623 become_aware_of_new_term(r.term);
1624 return;
1625 }
1626 if (state->current_view != r.term)
1627 {
1628 // Stale response, discard if success.
1629 // Otherwise reset sent_idx and try again.
1630 // NB: In NACKs the term may be that of an estimated matching index
1631 // in the log, rather than the current term, so it is correct for it to
1632 // be older in this case.
1633 if (r.success == AppendEntriesResponseType::OK)
1634 {
1636 "Recv {} to {} from {}: stale term ({} != {})",
1637 r.msg,
1638 state->node_id,
1639 from,
1640 r.term,
1641 state->current_view);
1642 return;
1643 }
1644 }
1645 else if (r.last_log_idx < node->second.match_idx)
1646 {
1647 // Response about past indices, discard if success.
1648 // Otherwise reset sent_idx and try again.
1649 // NB: It is correct for this index to move backwards during NACKs
1650 // which iteratively discover the last matching index of divergent logs
1651 // after an election.
1652 if (r.success == AppendEntriesResponseType::OK)
1653 {
1655 "Recv {} to {} from {}: stale idx", r.msg, state->node_id, from);
1656 return;
1657 }
1658 }
1659
1660 // Update next or match for the responding node.
1661 if (r.success == AppendEntriesResponseType::FAIL)
1662 {
1663 // Failed due to log inconsistency. Reset sent_idx, and try again soon.
1665 "Recv {} to {} from {}: failed", r.msg, state->node_id, from);
1666 const auto this_match =
1667 find_highest_possible_match({r.term, r.last_log_idx});
1668 node->second.sent_idx = std::max(
1669 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1670 return;
1671 }
1672
1673 // max(...) because why would we ever want to go backwards on a success
1674 // response?!
1675 node->second.match_idx = std::max(node->second.match_idx, r.last_log_idx);
1676
1678 "Recv {} to {} from {} for index {}: success",
1679 r.msg,
1680 state->node_id,
1681 from,
1682 r.last_log_idx);
1683 update_commit();
1684 }
1685
1686 void send_request_pre_vote(const ccf::NodeId& to)
1687 {
1688 auto last_committable_idx = last_committable_index();
1689 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1690
1691 RequestPreVote rpv{
1692 .term = state->current_view,
1693 .last_committable_idx = last_committable_idx,
1694 .term_of_last_committable_idx =
1695 get_term_internal(last_committable_idx)};
1696
1697#ifdef CCF_RAFT_TRACING
1698 nlohmann::json j = {};
1699 j["function"] = "send_request_vote";
1700 j["packet"] = rpv;
1701 j["state"] = *state;
1702 COMMITTABLE_INDICES(j["state"], state);
1703 j["to_node_id"] = to;
1705#endif
1706
1707 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rpv);
1708 }
1709
1710 void send_request_vote(const ccf::NodeId& to)
1711 {
1712 auto last_committable_idx = last_committable_index();
1713 CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");
1714
1715 RequestVote rv{
1716 .term = state->current_view,
1717 .last_committable_idx = last_committable_idx,
1718 .term_of_last_committable_idx =
1719 get_term_internal(last_committable_idx)};
1720
1721#ifdef CCF_RAFT_TRACING
1722 nlohmann::json j = {};
1723 j["function"] = "send_request_vote";
1724 j["packet"] = rv;
1725 j["state"] = *state;
1726 COMMITTABLE_INDICES(j["state"], state);
1727 j["to_node_id"] = to;
1729#endif
1730
1731 channels->send_authenticated(to, ccf::NodeMsgType::consensus_msg, rv);
1732 }
1733
1734 void recv_request_vote_unsafe(
1735 const ccf::NodeId& from, RequestVote r, ElectionType election_type)
1736 {
1737 // Do not check that from is a known node. It is possible to receive
1738 // RequestVotes from nodes that this node doesn't yet know, just as it
1739 // receives AppendEntries from those nodes. These should be obeyed just
1740 // like any other RequestVote - it is possible that this node is needed to
1741 // produce a primary in the new term, who will then help this node catch
1742 // up.
1743
1744 if (state->current_view > r.term)
1745 {
1746 // Reply false, since our term is later than the received term.
1748 "Recv {} to {} from {}: our term is later ({} > {})",
1749 r.msg,
1750 state->node_id,
1751 from,
1752 state->current_view,
1753 r.term);
1754 send_request_vote_response(from, false, election_type);
1755 return;
1756 }
1757 if (state->current_view < r.term)
1758 {
1760 "Recv {} to {} from {}: their term is later ({} < {})",
1761 r.msg,
1762 state->node_id,
1763 from,
1764 state->current_view,
1765 r.term);
1766
1767 // Even if ElectionType::PreVote, we should still update the term.
1768 // A pre-vote-candidate does not update its term until it becomes a
1769 // candidate. So a pre-vote request from a higher term indicates that we
1770 // should catch up to the term that had a candidate in it.
1771 become_aware_of_new_term(r.term);
1772 }
1773
1774 bool grant_vote = true;
1775
1776 if ((election_type == ElectionType::RegularVote) && leader_id.has_value())
1777 {
1778 // Reply false, since we already know the leader in the current term.
1780 "Recv {} to {} from {}: leader {} already known in term {}",
1781 r.msg,
1782 state->node_id,
1783 from,
1784 leader_id.value(),
1785 state->current_view);
1786 grant_vote = false;
1787 }
1788
1789 auto voted_for_other =
1790 (voted_for.has_value()) && (voted_for.value() != from);
1791 if ((election_type == ElectionType::RegularVote) && voted_for_other)
1792 {
1793 // Reply false, since we already voted for someone else.
1795 "Recv {} to {} from {}: already voted for {}",
1796 r.msg,
1797 state->node_id,
1798 from,
1799 voted_for.value());
1800 grant_vote = false;
1801 }
1802
1803 // If the candidate's committable log is at least as up-to-date as ours,
1804 // vote yes
1805
1806 const auto last_committable_idx = last_committable_index();
1807 const auto term_of_last_committable_idx =
1808 get_term_internal(last_committable_idx);
1809 const auto log_up_to_date =
1810 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1811 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1812 (r.last_committable_idx >= last_committable_idx));
1813 if (!log_up_to_date)
1814 {
1816 "Recv {} to {} from {}: candidate log {}.{} is not up-to-date "
1817 "with ours {}.{}",
1818 r.msg,
1819 state->node_id,
1820 from,
1821 r.term_of_last_committable_idx,
1822 r.last_committable_idx,
1823 term_of_last_committable_idx,
1824 last_committable_idx);
1825 grant_vote = false;
1826 }
1827
1828 if (grant_vote && election_type == ElectionType::RegularVote)
1829 {
1830 // If we grant our vote to a candidate, then an election is in progress
1831 restart_election_timeout();
1832 leader_id.reset();
1833 voted_for = from;
1834 }
1835
1837 "Recv {} to {} from {}: {} vote to candidate at {}.{} with "
1838 "local state at {}.{}",
1839 r.msg,
1840 state->node_id,
1841 from,
1842 grant_vote ? "granted" : "denied",
1843 r.term_of_last_committable_idx,
1844 r.last_committable_idx,
1845 term_of_last_committable_idx,
1846 last_committable_idx);
1847
1848 send_request_vote_response(from, grant_vote, election_type);
1849 }
1850
1851 void recv_request_vote(const ccf::NodeId& from, RequestVote r)
1852 {
1853 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1854
1855#ifdef CCF_RAFT_TRACING
1856 nlohmann::json j = {};
1857 j["function"] = "recv_request_vote";
1858 j["packet"] = r;
1859 j["state"] = *state;
1860 COMMITTABLE_INDICES(j["state"], state);
1861 j["from_node_id"] = from;
1863#endif
1864
1865 recv_request_vote_unsafe(from, r, ElectionType::RegularVote);
1866 }
1867
1868 void recv_request_pre_vote(const ccf::NodeId& from, RequestPreVote r)
1869 {
1870 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1871
1872#ifdef CCF_RAFT_TRACING
1873 nlohmann::json j = {};
1874 j["function"] = "recv_request_vote";
1875 j["packet"] = r;
1876 j["state"] = *state;
1877 COMMITTABLE_INDICES(j["state"], state);
1878 j["from_node_id"] = from;
1880#endif
1881
1882 // A pre-vote is a speculative request vote, so we translate it back to a
1883 // RequestVote to avoid duplicating the logic.
1884 RequestVote rv{
1885 .term = r.term,
1886 .last_committable_idx = r.last_committable_idx,
1887 .term_of_last_committable_idx = r.term_of_last_committable_idx,
1888 };
1890 recv_request_vote_unsafe(from, rv, ElectionType::PreVote);
1891 }
1892
1893 void send_request_vote_response(
1894 const ccf::NodeId& to, bool answer, ElectionType election_type)
1895 {
1896 if (election_type == ElectionType::RegularVote)
1897 {
1898 RequestVoteResponse response{
1899 .term = state->current_view, .vote_granted = answer};
1900
1902 "Send {} from {} to {}: {}",
1903 response.msg,
1904 state->node_id,
1905 to,
1906 answer);
1907
1908 channels->send_authenticated(
1909 to, ccf::NodeMsgType::consensus_msg, response);
1910 }
1911 else
1912 {
1913 RequestPreVoteResponse response{
1914 .term = state->current_view, .vote_granted = answer};
1915
1917 "Send {} from {} to {}: {}",
1918 response.msg,
1919 state->node_id,
1920 to,
1921 answer);
1922
1923 channels->send_authenticated(
1924 to, ccf::NodeMsgType::consensus_msg, response);
1925 }
1926 }
1927
1928 void recv_request_vote_response(
1929 const ccf::NodeId& from,
1930 RequestVoteResponse r,
1931 ElectionType election_type)
1932 {
1933 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1934
1935#ifdef CCF_RAFT_TRACING
1936 nlohmann::json j = {};
1937 j["function"] = "recv_request_vote_response";
1938 j["packet"] = r;
1939 j["state"] = *state;
1940 COMMITTABLE_INDICES(j["state"], state);
1941 j["from_node_id"] = from;
1943#endif
1944
1945 // Ignore if we don't recognise the node.
1946 auto node = all_other_nodes.find(from);
1947 if (node == all_other_nodes.end())
1948 {
1950 "Recv {} to {} from {}: unknown node", r.msg, state->node_id, from);
1951 return;
1952 }
1953
1954 if (state->current_view < r.term)
1955 {
1957 "Recv {} to {} from {}: their term is more recent "
1958 "({} < {})",
1959 r.msg,
1960 state->node_id,
1961 from,
1962 state->current_view,
1963 r.term);
1964 become_aware_of_new_term(r.term);
1965 return;
1966 }
1967 if (state->current_view != r.term)
1968 {
1969 // Ignore as it is stale.
1971 "Recv request vote response to {} from {}: stale ({} != {})",
1972 state->node_id,
1973 from,
1974 state->current_view,
1975 r.term);
1976 return;
1977 }
1978
1979 if (
1980 state->leadership_state != ccf::kv::LeadershipState::PreVoteCandidate &&
1981 state->leadership_state != ccf::kv::LeadershipState::Candidate)
1982 {
1984 "Recv {} to {} from: {}: we aren't a candidate",
1985 r.msg,
1986 state->node_id,
1987 from);
1988 return;
1989 }
1990 if (
1991 election_type == ElectionType::RegularVote &&
1992 state->leadership_state != ccf::kv::LeadershipState::Candidate)
1993 {
1994 // Stale message from previous candidacy
1995 // Candidate(T) -> Follower(T) -> PreVoteCandidate(T)
1997 "Recv {} to {} from {}: no longer a candidate in {}",
1998 r.msg,
1999 state->node_id,
2000 from,
2001 r.term);
2002 return;
2003 }
2004 if (
2005 election_type == ElectionType::PreVote &&
2006 state->leadership_state != ccf::kv::LeadershipState::PreVoteCandidate)
2007 {
2008 // To receive a PreVoteResponse, we must have been a PreVoteCandidate in
2009 // that term.
2010 // Since we are a Candidate for term T, we can only have transitioned
2011 // from PreVoteCandidate for term (T-1). Since terms are monotonic this
2012 // is impossible.
2014 "Recv {} to {} from {}: unexpected message in {} when "
2015 "Candidate for {}",
2016 r.msg,
2017 state->node_id,
2018 from,
2019 r.term,
2020 state->current_view);
2021 return;
2022 }
2023
2024 if (!r.vote_granted)
2025 {
2026 // Do nothing.
2028 "Recv request vote response to {} from {}: they voted no",
2029 state->node_id,
2030 from);
2031 return;
2032 }
2033
2035 "Recv request vote response to {} from {}: they voted yes",
2036 state->node_id,
2037 from);
2038
2039 add_vote_for_me(from);
2040 }
2041
2042 void recv_request_vote_response(
2043 const ccf::NodeId& from, RequestVoteResponse r)
2044 {
2045 recv_request_vote_response(from, r, ElectionType::RegularVote);
2046 }
2047
2048 void recv_request_pre_vote_response(
2049 const ccf::NodeId& from, RequestPreVoteResponse r)
2050 {
2051 RequestVoteResponse rvr{.term = r.term, .vote_granted = r.vote_granted};
2053 recv_request_vote_response(from, rvr, ElectionType::PreVote);
2054 }
2055
2056 void recv_propose_request_vote(
2057 const ccf::NodeId& from, ProposeRequestVote r)
2058 {
2059 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
2060
2061#ifdef CCF_RAFT_TRACING
2062 nlohmann::json j = {};
2063 j["function"] = "recv_propose_request_vote";
2064 j["packet"] = r;
2065 j["state"] = *state;
2066 COMMITTABLE_INDICES(j["state"], state);
2067 j["from_node_id"] = from;
2069#endif
2070 if (!is_retired_committed() && ticking && r.term == state->current_view)
2071 {
2073 "Becoming candidate early due to propose request vote from {}", from);
2074 become_candidate();
2075 }
2076 else
2077 {
2078 RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
2079 }
2080 }
2081
2082 void restart_election_timeout()
2083 {
2084 // Randomise timeout_elapsed to get a random election timeout
2085 // between 0.5x and 1x the configured election timeout.
2086 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
2087 }
2088
2089 void reset_votes_for_me()
2090 {
2091 votes_for_me.clear();
2092 for (auto const& conf : configurations)
2093 {
2094 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
2095 votes_for_me[conf.idx].votes.clear();
2096 }
2097 }
2098
2099 void become_pre_vote_candidate()
2100 {
2101 if (configurations.empty())
2102 {
2104 "Not becoming pre-vote candidate {} due to lack of a configuration.",
2105 state->node_id);
2106 return;
2107 }
2108
2109 state->leadership_state = ccf::kv::LeadershipState::PreVoteCandidate;
2110 leader_id.reset();
2111
2112 reset_votes_for_me();
2113 restart_election_timeout();
2114
2116 "Becoming pre-vote candidate {}: {}",
2117 state->node_id,
2118 state->current_view);
2119
2120#ifdef CCF_RAFT_TRACING
2121 nlohmann::json j = {};
2122 j["function"] = "become_pre_vote_candidate";
2123 j["state"] = *state;
2124 COMMITTABLE_INDICES(j["state"], state);
2125 j["configurations"] = configurations;
2127#endif
2128
2129 add_vote_for_me(state->node_id);
2130
2131 // Request votes only go to nodes in configurations, since only
2132 // their votes can be tallied towards an election quorum.
2133 for (auto const& node_id : other_nodes_in_active_configs())
2134 {
2135 // ccfraft!RequestVote
2136 send_request_pre_vote(node_id);
2137 }
2138 }
2139
2140 // ccfraft!Timeout
2141 void become_candidate()
2142 {
2143 if (configurations.empty())
2144 {
2145 // ccfraft!Timeout:
2146 // /\ \E c \in DOMAIN configurations[i] :
2147 // /\ i \in configurations[i][c]
2149 "Not becoming candidate {} due to lack of a configuration.",
2150 state->node_id);
2151 return;
2152 }
2153
2154 state->leadership_state = ccf::kv::LeadershipState::Candidate;
2155 leader_id.reset();
2156
2157 voted_for = state->node_id;
2158 reset_votes_for_me();
2159 state->current_view++;
2160
2161 restart_election_timeout();
2162 reset_last_ack_timeouts();
2163
2165 "Becoming candidate {}: {}", state->node_id, state->current_view);
2166
2167#ifdef CCF_RAFT_TRACING
2168 nlohmann::json j = {};
2169 j["function"] = "become_candidate";
2170 j["state"] = *state;
2171 COMMITTABLE_INDICES(j["state"], state);
2172 j["configurations"] = configurations;
2174#endif
2175
2176 add_vote_for_me(state->node_id);
2177
2178 // Request votes only go to nodes in configurations, since only
2179 // their votes can be tallied towards an election quorum.
2180 for (auto const& node_id : other_nodes_in_active_configs())
2181 {
2182 // ccfraft!RequestVote
2183 send_request_vote(node_id);
2184 }
2185 }
2186
2187 void become_leader(bool /*force_become_leader*/ = false)
2188 {
2189 if (is_retired_committed())
2190 {
2191 return;
2192 }
2193
2194 const auto election_index = last_committable_index();
2195
2197 "Election index is {} in term {}", election_index, state->current_view);
2198 // Discard any un-committable updates we may hold,
2199 // since we have no signature for them. Except at startup,
2200 // where we do not want to roll back the genesis transaction.
2201 if (state->commit_idx > 0)
2202 {
2203 rollback(election_index);
2204 }
2205 else
2206 {
2207 // but we still want the KV to know which term we're in
2208 store->initialise_term(state->current_view);
2209 }
2210
2211 state->leadership_state = ccf::kv::LeadershipState::Leader;
2212 leader_id = state->node_id;
2213 should_sign = true;
2214
2215 using namespace std::chrono_literals;
2216 timeout_elapsed = 0ms;
2217
2218 reset_last_ack_timeouts();
2219
2221 "Becoming leader {}: {}", state->node_id, state->current_view);
2222
2223#ifdef CCF_RAFT_TRACING
2224 nlohmann::json j = {};
2225 j["function"] = "become_leader";
2226 j["state"] = *state;
2227 COMMITTABLE_INDICES(j["state"], state);
2228 j["configurations"] = configurations;
2230#endif
2231
2232 // Try to advance commit at once if there are no other nodes.
2233 if (other_nodes_in_active_configs().size() == 0)
2234 {
2235 update_commit();
2236 }
2237
2238 // Reset next, match, and sent indices for all nodes.
2239 auto next = state->last_idx + 1;
2240
2241 for (auto& node : all_other_nodes)
2242 {
2243 node.second.match_idx = 0;
2244 node.second.sent_idx = next - 1;
2245
2246 // Send an empty append_entries to all nodes.
2247 send_append_entries(node.first, next);
2248 }
2249
2250 if (retired_node_cleanup)
2251 {
2252 retired_node_cleanup->cleanup();
2253 }
2254 }
2255
2256 public:
2257 // Called when a replica becomes follower in the same term, e.g. when the
2258 // primary node has not received a majority of acks (CheckQuorum)
2260 {
2261 leader_id.reset();
2262 restart_election_timeout();
2263 reset_last_ack_timeouts();
2264
2265 state->leadership_state = ccf::kv::LeadershipState::Follower;
2267 "Becoming follower {}: {}.{}",
2268 state->node_id,
2269 state->current_view,
2270 state->commit_idx);
2271
2272#ifdef CCF_RAFT_TRACING
2273 nlohmann::json j = {};
2274 j["function"] = "become_follower";
2275 j["state"] = *state;
2276 COMMITTABLE_INDICES(j["state"], state);
2277 j["configurations"] = configurations;
2279#endif
2280 }
2281
2282 // Called when a replica becomes aware of the existence of a new term
2283 // If retired already, state remains unchanged, but the replica otherwise
2284 // becomes a follower in the new term.
2286 {
2287 RAFT_DEBUG_FMT("Becoming aware of new term {}", term);
2288
2289 state->current_view = term;
2290 voted_for.reset();
2291 reset_votes_for_me();
2292 become_follower();
2293 is_new_follower = true;
2294 }
2295
2296 private:
2297 void send_propose_request_vote()
2298 {
2299 ProposeRequestVote prv{.term = state->current_view};
2300
2301 std::optional<ccf::NodeId> successor = std::nullopt;
2302 Index max_match_idx = 0;
2303 ccf::kv::ReconfigurationId reconf_id_of_max_match = 0;
2304
2305 // Pick the node that has the highest match_idx, and break
2306 // ties by looking at the highest reconfiguration id they are
2307 // part of. This can lead to nudging a node that is
2308 // about to retire too, but that node will then nudge
2309 // a successor, and that seems preferable to nudging a node that
2310 // risks not being eligible if reconfiguration id is prioritised.
2311 // Alternatively, we could pick the node with the highest match idx
2312 // in the latest config, provided that match idx at least as high as a
2313 // majority. That would make them both eligible and unlikely to retire
2314 // soon.
2315 for (auto& [node, node_state] : all_other_nodes)
2316 {
2317 if (node_state.match_idx >= max_match_idx)
2318 {
2319 ccf::kv::ReconfigurationId latest_reconf_id = 0;
2320 auto conf = configurations.rbegin();
2321 while (conf != configurations.rend())
2322 {
2323 if (conf->nodes.find(node) != conf->nodes.end())
2324 {
2325 latest_reconf_id = conf->idx;
2326 break;
2327 }
2328 conf++;
2329 }
2330 if (!(node_state.match_idx == max_match_idx &&
2331 latest_reconf_id < reconf_id_of_max_match))
2332 {
2333 reconf_id_of_max_match = latest_reconf_id;
2334 successor = node;
2335 max_match_idx = node_state.match_idx;
2336 }
2337 }
2338 }
2339 if (successor.has_value())
2340 {
2341 RAFT_INFO_FMT("Proposing that {} becomes candidate", successor.value());
2342 channels->send_authenticated(
2343 successor.value(), ccf::NodeMsgType::consensus_msg, prv);
2344 }
2345 }
2346 void become_retired(Index idx, ccf::kv::RetirementPhase phase)
2347 {
2349 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2350 phase,
2351 state->leadership_state,
2352 state->node_id,
2353 state->current_view,
2354 idx);
2355
2357 {
2359 !state->retirement_idx.has_value(),
2360 "retirement_idx already set to {}",
2361 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2362 state->retirement_idx.value());
2363 state->retirement_idx = idx;
2364 RAFT_INFO_FMT("Node retiring at {}", idx);
2365 }
2366 else if (phase == ccf::kv::RetirementPhase::Signed)
2367 {
2368 assert(state->retirement_idx.has_value());
2370 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2371 idx >= state->retirement_idx.value(),
2372 "Index {} unexpectedly lower than retirement_idx {}",
2373 idx,
2374 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
2375 state->retirement_idx.value());
2376 state->retirement_committable_idx = idx;
2377 RAFT_INFO_FMT("Node retirement committable at {}", idx);
2378 }
2380 {
2381 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2382 {
2383 send_propose_request_vote();
2384 }
2385
2386 leader_id.reset();
2387 state->leadership_state = ccf::kv::LeadershipState::None;
2388 }
2389
2390 state->membership_state = ccf::kv::MembershipState::Retired;
2391 state->retirement_phase = phase;
2392 }
2393
2394 void add_vote_for_me(const ccf::NodeId& from)
2395 {
2396 if (configurations.empty())
2397 {
2399 "Not voting for myself {} due to lack of a configuration.",
2400 state->node_id);
2401 return;
2402 }
2403
2404 // Add vote for from node in each configuration where it is present
2405 for (auto const& conf : configurations)
2406 {
2407 auto const& nodes = conf.nodes;
2408 if (nodes.find(from) == nodes.end())
2409 {
2410 // from node is no longer in any active configuration.
2411 continue;
2412 }
2413
2414 votes_for_me[conf.idx].votes.insert(from);
2416 "Node {} voted for {} in configuration {} with quorum {}",
2417 from,
2418 state->node_id,
2419 conf.idx,
2420 votes_for_me[conf.idx].quorum);
2421 }
2422
2423 // We need a quorum of votes in _all_ configurations
2424 bool is_elected = true;
2425 for (auto const& v : votes_for_me)
2426 {
2427 auto const& quorum = v.second.quorum;
2428 auto const& votes = v.second.votes;
2429
2430 if (votes.size() < quorum)
2431 {
2432 is_elected = false;
2433 break;
2434 }
2435 }
2436
2437 if (is_elected)
2438 {
2439 switch (state->leadership_state)
2440 {
2442 become_candidate();
2443 break;
2445 become_leader();
2446 break;
2450 throw std::logic_error(
2451 "add_vote_for_me() called while not a pre-vote candidate or "
2452 "candidate");
2453 }
2454 }
2455 }
2456
2457 // If there exists some committable idx in the current term such that idx >
2458 // commit_idx and a majority of nodes have replicated it, commit to that
2459 // idx.
2460 void update_commit()
2461 {
2462 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2463 {
2464 throw std::logic_error(
2465 "update_commit() must only be called while this node is leader");
2466 }
2467
2468 std::optional<Index> new_agreement_index = std::nullopt;
2469
2470 // Obtain CFT watermarks
2471 for (auto const& c : configurations)
2472 {
2473 // The majority must be checked separately for each active
2474 // configuration.
2475 std::vector<Index> match;
2476 match.reserve(c.nodes.size());
2477
2478 for (const auto& node : c.nodes)
2479 {
2480 if (node.first == state->node_id)
2481 {
2482 match.push_back(state->last_idx);
2483 }
2484 else
2485 {
2486 match.push_back(all_other_nodes.at(node.first).match_idx);
2487 }
2488 }
2489
2490 sort(match.begin(), match.end());
2491 auto confirmed = match.at((match.size() - 1) / 2);
2492
2493 if (
2494 !new_agreement_index.has_value() ||
2495 confirmed < new_agreement_index.value())
2496 {
2497 new_agreement_index = confirmed;
2498 }
2499 }
2500
2501 if (new_agreement_index.has_value())
2502 {
2503 if (new_agreement_index.value() > state->last_idx)
2504 {
2505 throw std::logic_error(
2506 "Followers appear to have later match indices than leader");
2507 }
2508
2509 const auto new_commit_idx =
2510 find_highest_possible_committable_index(new_agreement_index.value());
2511
2512 if (new_commit_idx.has_value())
2513 {
2515 "In update_commit, new_commit_idx: {}, "
2516 "last_idx: {}",
2517 new_commit_idx.value(),
2518 state->last_idx);
2519
2520 const auto term_of_new = get_term_internal(new_commit_idx.value());
2521 if (term_of_new == state->current_view)
2522 {
2523 commit(new_commit_idx.value());
2524 }
2525 else
2526 {
2528 "Ack quorum at {} resulted in proposed commit index {}, which "
2529 "is in term {}. Waiting for agreement on committable entry in "
2530 "current term {} to update commit",
2531 new_agreement_index.value(),
2532 new_commit_idx.value(),
2533 term_of_new,
2534 state->current_view);
2535 }
2536 }
2537 }
2538 }
2539
2540 // Commits at the highest committable index which is not greater than the
2541 // given idx.
2542 void commit_if_possible(Index idx)
2543 {
2545 "Commit if possible {} (ci: {}) (ti {})",
2546 idx,
2547 state->commit_idx,
2548 get_term_internal(idx));
2549 if (
2550 (idx > state->commit_idx) &&
2551 (get_term_internal(idx) <= state->current_view))
2552 {
2553 const auto highest_committable =
2554 find_highest_possible_committable_index(idx);
2555 if (highest_committable.has_value())
2556 {
2557 commit(highest_committable.value());
2558 }
2559 }
2560 }
2561
2562 size_t get_quorum(size_t n) const
2563 {
2564 return (n / 2) + 1;
2565 }
2566
2567 void commit(Index idx)
2568 {
2569 if (idx > state->last_idx)
2570 {
2571 throw std::logic_error(fmt::format(
2572 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2573 }
2574
2575 RAFT_DEBUG_FMT("Starting commit");
2576
2577 // This could happen if a follower becomes the leader when it
2578 // has committed fewer log entries, although it has them available.
2579 if (idx <= state->commit_idx)
2580 {
2581 return;
2582 }
2583
2584#ifdef CCF_RAFT_TRACING
2585 nlohmann::json j = {};
2586 j["function"] = "commit";
2587 j["args"] = nlohmann::json::object();
2588 j["args"]["idx"] = idx;
2589 j["state"] = *state;
2590 COMMITTABLE_INDICES(j["state"], state);
2591 j["configurations"] = configurations;
2593#endif
2594
2595 compact_committable_indices(idx);
2596
2597 state->commit_idx = idx;
2598 if (
2599 is_retired() &&
2600 state->retirement_phase == ccf::kv::RetirementPhase::Signed &&
2601 state->retirement_committable_idx.has_value())
2602 {
2603 const auto retirement_committable =
2604 state // NOLINT(bugprone-unchecked-optional-access)
2605 ->retirement_committable_idx.value();
2606 if (idx >= retirement_committable)
2607 {
2608 become_retired(idx, ccf::kv::RetirementPhase::Completed);
2609 }
2610 }
2611
2612 RAFT_DEBUG_FMT("Compacting...");
2613 store->compact(idx);
2614 ledger->commit(idx);
2615
2616 if (commit_callbacks != nullptr)
2617 {
2618 const auto term = get_term_internal(idx);
2619 commit_callbacks->trigger_callbacks({term, idx}, state->view_history);
2620 }
2621
2622 RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx);
2623
2624 // Examine each configuration that is followed by a globally committed
2625 // configuration.
2626 bool changed = false;
2627
2628 while (true)
2629 {
2630 auto conf = configurations.begin();
2631 if (conf == configurations.end())
2632 {
2633 break;
2634 }
2635
2636 auto next = std::next(conf);
2637 if (next == configurations.end())
2638 {
2639 break;
2640 }
2641
2642 if (idx < next->idx)
2643 {
2644 break;
2645 }
2646
2648 "Configurations: discard committed configuration at {}", conf->idx);
2649 configurations.pop_front();
2650 changed = true;
2651 }
2652
2653 if (changed)
2654 {
2655 create_and_remove_node_state();
2656 if (retired_node_cleanup && is_primary())
2657 {
2658 retired_node_cleanup->cleanup();
2659 }
2660 }
2661 }
2662
2663 bool is_self_in_latest_config()
2664 {
2665 bool present = false;
2666 if (!configurations.empty())
2667 {
2668 auto current_nodes = configurations.back().nodes;
2669 present = current_nodes.find(state->node_id) != current_nodes.end();
2670 }
2671 return present;
2672 }
2673
2674 void start_ticking_if_necessary()
2675 {
2676 if (!ticking && is_self_in_latest_config())
2677 {
2678 start_ticking();
2679 }
2680 }
2681
2682 public:
2683 void rollback(Index idx)
2684 {
2685 if (idx < state->commit_idx)
2686 {
2688 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2689 "ignoring rollback request",
2690 idx,
2691 state->commit_idx);
2692 return;
2693 }
2694
2695 store->rollback({get_term_internal(idx), idx}, state->current_view);
2696
2697 RAFT_DEBUG_FMT("Setting term in store to: {}", state->current_view);
2698 ledger->truncate(idx);
2699 state->last_idx = idx;
2700 RAFT_DEBUG_FMT("Rolled back at {}", idx);
2701
2702 state->view_history.rollback(idx);
2703
2704 while (!state->committable_indices.empty() &&
2705 (state->committable_indices.back() > idx))
2706 {
2707 state->committable_indices.pop_back();
2708 }
2709
2710 if (
2711 state->membership_state == ccf::kv::MembershipState::Retired &&
2712 state->retirement_phase == ccf::kv::RetirementPhase::Signed)
2713 {
2714 assert(state->retirement_committable_idx.has_value());
2715 if (state->retirement_committable_idx.has_value())
2716 {
2717 const auto retirement_committable =
2718 state // NOLINT(bugprone-unchecked-optional-access)
2719 ->retirement_committable_idx.value();
2720 if (retirement_committable > idx)
2721 {
2722 state->retirement_committable_idx = std::nullopt;
2723 state->retirement_phase = ccf::kv::RetirementPhase::Ordered;
2724 }
2725 }
2726 }
2727
2728 if (
2729 state->membership_state == ccf::kv::MembershipState::Retired &&
2730 state->retirement_phase == ccf::kv::RetirementPhase::Ordered)
2731 {
2732 assert(state->retirement_idx.has_value());
2733 if (state->retirement_idx.has_value())
2734 {
2735 const auto retirement =
2736 state // NOLINT(bugprone-unchecked-optional-access)
2737 ->retirement_idx.value();
2738 if (retirement > idx)
2739 {
2740 state->retirement_idx = std::nullopt;
2741 state->retirement_phase = std::nullopt;
2742 state->membership_state = ccf::kv::MembershipState::Active;
2743 RAFT_DEBUG_FMT("Becoming Active after rollback");
2744 }
2745 }
2746 }
2747
2748 // Rollback configurations.
2749 bool changed = false;
2750
2751 while (!configurations.empty() && (configurations.back().idx > idx))
2752 {
2754 "Configurations: rollback configuration at {}",
2755 configurations.back().idx);
2756 configurations.pop_back();
2757 changed = true;
2758 }
2759
2760 if (changed)
2761 {
2762 create_and_remove_node_state();
2763 }
2764 }
2765
2766 nlohmann::json get_state_representation() const
2767 {
2768 return *state;
2769 }
2770
2771 void nominate_successor() override
2772 {
2773 if (state->leadership_state != ccf::kv::LeadershipState::Leader)
2774 {
2776 "Not proposing request vote from {} since not leader",
2777 state->node_id);
2778 return;
2779 }
2780
2781 LOG_INFO_FMT("Nominating successor for {}", state->node_id);
2782
2783#ifdef CCF_RAFT_TRACING
2784 nlohmann::json j = {};
2785 j["function"] = "step_down_and_nominate_successor";
2786 j["state"] = *state;
2787 COMMITTABLE_INDICES(j["state"], state);
2788 j["configurations"] = configurations;
2790#endif
2791
2792 send_propose_request_vote();
2793 }
2794
2795 private:
2796 void create_and_remove_node_state()
2797 {
2798 // Find all nodes present in any active configuration.
2799 Configuration::Nodes active_nodes;
2800
2801 for (auto const& conf : configurations)
2802 {
2803 for (auto const& node : conf.nodes)
2804 {
2805 active_nodes.emplace(node.first, node.second);
2806 }
2807 }
2808
2809 // Add all active nodes that are not already present in the node state.
2810 for (auto node_info : active_nodes)
2811 {
2812 if (node_info.first == state->node_id)
2813 {
2814 continue;
2815 }
2816
2817 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2818 {
2819 if (!channels->have_channel(node_info.first))
2820 {
2822 "Configurations: create node channel with {}", node_info.first);
2823
2824 channels->associate_node_address(
2825 node_info.first,
2826 node_info.second.hostname,
2827 node_info.second.port);
2828 }
2829
2830 // A new node is sent only future entries initially. If it does not
2831 // have prior data, it will communicate that back to the leader.
2832 auto index = state->last_idx + 1;
2833 all_other_nodes.try_emplace(
2834 node_info.first, node_info.second, index, 0);
2835
2836 if (state->leadership_state == ccf::kv::LeadershipState::Leader)
2837 {
2838 send_append_entries(node_info.first, index);
2839 }
2840
2842 "Added raft node {} ({}:{})",
2843 node_info.first,
2844 node_info.second.hostname,
2845 node_info.second.port);
2846 }
2847 }
2848 }
2849 };
2850}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition raft.h:98
Term get_view() override
Definition raft.h:477
ccf::NodeId id() override
Definition raft.h:259
std::shared_ptr< ccf::NodeToNode > channels
Definition raft.h:211
bool is_retired_completed() const
Definition raft.h:331
std::set< ccf::NodeId > other_nodes_in_active_configs() const
Definition raft.h:512
std::vector< Index > get_view_history_since(Index idx) override
Definition raft.h:502
bool can_replicate() override
Definition raft.h:274
bool replicate(const ccf::kv::BatchVector &entries, Term term) override
Definition raft.h:628
void add_configuration(Index idx, const ccf::kv::Configuration::Nodes &conf) override
Definition raft.h:530
bool is_backup() override
Definition raft.h:310
nlohmann::json get_state_representation() const
Definition raft.h:2766
void become_follower()
Definition raft.h:2259
void compact_committable_indices(Index idx)
Definition raft.h:390
bool is_active() const
Definition raft.h:315
void enable_all_domains() override
Definition raft.h:399
std::optional< Index > find_highest_possible_committable_index(Index idx) const
Definition raft.h:374
void start_ticking()
Definition raft.h:569
std::vector< Index > get_view_history(Index idx) override
Definition raft.h:496
void recv_message(const ccf::NodeId &from, const uint8_t *data, size_t size) override
Definition raft.h:742
void nominate_successor() override
Definition raft.h:2771
void init_as_backup(Index index, Term term, const std::vector< Index > &term_history, Index recovery_start_index=0) override
Definition raft.h:446
Index last_committable_index() const
Definition raft.h:365
void rollback(Index idx)
Definition raft.h:2683
bool is_candidate() override
Definition raft.h:269
bool is_primary() override
Definition raft.h:264
~Aft() override=default
std::pair< Term, Index > get_committed_txid() override
Definition raft.h:483
void become_aware_of_new_term(Term term)
Definition raft.h:2285
ccf::kv::ConsensusDetails get_details() override
Definition raft.h:602
Configuration::Nodes get_latest_configuration() override
Definition raft.h:596
bool is_retired_committed() const
Definition raft.h:325
Term get_view(Index idx) override
Definition raft.h:490
void force_become_primary() override
Definition raft.h:407
bool is_at_max_capacity() override
Definition raft.h:285
static constexpr size_t append_entries_size_limit
Definition raft.h:209
Consensus::SignatureDisposition get_signature_disposition() override
Definition raft.h:296
Aft(const ccf::consensus::Configuration &settings_, std::unique_ptr< Store > store_, std::unique_ptr< LedgerProxy > ledger_, std::shared_ptr< ccf::NodeToNode > channels_, std::shared_ptr< aft::State > state_, std::shared_ptr< ccf::NodeClient > rpc_request_context_, std::shared_ptr< ccf::CommitCallbackSubsystem > commit_callbacks_subsystem_=nullptr, bool public_only_=false)
Definition raft.h:213
Index get_committed_seqno() override
Definition raft.h:471
bool is_retired() const
Definition raft.h:320
void force_become_primary(Index index, Term term, const std::vector< Index > &terms, Index commit_idx_) override
Definition raft.h:422
Index get_last_idx()
Definition raft.h:466
void set_retired_committed(ccf::SeqNo seqno, const std::vector< ccf::kv::NodeId > &node_ids) override
Definition raft.h:337
void periodic(std::chrono::milliseconds elapsed) override
Definition raft.h:839
void reset_last_ack_timeouts()
Definition raft.h:577
std::unique_ptr< LedgerProxy > ledger
Definition raft.h:210
std::optional< ccf::NodeId > primary() override
Definition raft.h:254
Configuration::Nodes get_latest_configuration_unsafe() const override
Definition raft.h:586
static constexpr ccf::View InvalidView
Definition state.h:24
Definition node_to_node.h:22
NodeId from
Definition node_to_node.h:24
Definition kv_types.h:367
Definition serialized.h:18
const char * what() const noexcept override
Definition serialized.h:25
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition state.h:17
ElectionType
Definition raft_types.h:185
@ PreVote
Definition raft_types.h:186
@ RegularVote
Definition raft_types.h:187
AppendEntriesResponseType
Definition raft_types.h:154
uint64_t Term
Definition raft_types.h:20
@ raft_append_entries_response
Definition raft_types.h:93
@ raft_request_pre_vote_response
Definition raft_types.h:99
@ raft_request_vote
Definition raft_types.h:95
@ raft_request_pre_vote
Definition raft_types.h:98
@ raft_request_vote_response
Definition raft_types.h:96
@ raft_append_entries
Definition raft_types.h:92
@ raft_propose_request_vote
Definition raft_types.h:97
@ raft_append_entries_signed_response
Definition raft_types.h:94
uint64_t Index
Definition raft_types.h:19
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:209
uint64_t Version
Definition version.h:10
RetirementPhase
Definition kv_types.h:136
uint64_t ReconfigurationId
Definition kv_types.h:49
ApplyResult
Definition kv_types.h:305
@ PASS_NONCES
Definition kv_types.h:310
@ PASS_SIGNATURE
Definition kv_types.h:307
@ PASS_BACKUP_SIGNATURE
Definition kv_types.h:308
@ PASS_BACKUP_SIGNATURE_SEND_ACK
Definition kv_types.h:309
@ FAIL
Definition kv_types.h:314
@ PASS
Definition kv_types.h:306
@ PASS_APPLY
Definition kv_types.h:313
@ PASS_ENCRYPTED_PAST_LEDGER_SECRET
Definition kv_types.h:312
@ PASS_NEW_VIEW
Definition kv_types.h:311
Definition app_interface.h:13
@ ONE_TRANSACTION
Definition reconfiguration_type.h:11
constexpr View VIEW_UNKNOWN
Definition tx_id.h:26
@ consensus_msg
Definition node_types.h:23
uint64_t SeqNo
Definition tx_id.h:36
Definition dl_list.h:9
STL namespace.
#define LOG_ROLLBACK_INFO_FMT
Definition raft.h:90
#define RAFT_TRACE_FMT
Definition raft.h:59
#define RAFT_TRACE_JSON_OUT(json_object)
Definition raft.h:65
#define RAFT_FAIL_FMT
Definition raft.h:62
#define RAFT_INFO_FMT
Definition raft.h:61
#define RAFT_DEBUG_FMT
Definition raft.h:60
Definition raft_types.h:166
Definition raft_types.h:128
Definition raft_types.h:244
Term term
Definition raft_types.h:247
Definition raft_types.h:233
Definition raft_types.h:209
Definition raft_types.h:222
Definition raft_types.h:197
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition consensus_config.h:11
Definition kv_types.h:54
Definition kv_types.h:52
std::map< NodeId, NodeInfo > Nodes
Definition kv_types.h:71
Nodes nodes
Definition kv_types.h:74
Definition kv_types.h:154
bool ticking
Definition kv_types.h:171
std::optional< ccf::ReconfigurationType > reconfiguration_type
Definition kv_types.h:168
std::optional< RetirementPhase > retirement_phase
Definition kv_types.h:165
ccf::View current_view
Definition kv_types.h:170
std::optional< LeadershipState > leadership_state
Definition kv_types.h:164
MembershipState membership_state
Definition kv_types.h:163
std::unordered_map< ccf::NodeId, Ack > acks
Definition kv_types.h:162
std::optional< ccf::NodeId > primary_id
Definition kv_types.h:169
std::vector< Configuration > configs
Definition kv_types.h:161