111 std::chrono::milliseconds last_ack_timeout{0};
113 NodeState() =
default;
118 Index match_idx_ = 0) :
119 node_info(std::move(node_info_)),
121 match_idx(match_idx_),
127 std::unique_ptr<Store> store;
130 std::optional<ccf::NodeId> voted_for = std::nullopt;
131 std::optional<ccf::NodeId> leader_id = std::nullopt;
136 std::unordered_set<ccf::NodeId> votes;
139 std::map<Index, Votes> votes_for_me;
141 std::chrono::milliseconds timeout_elapsed;
152 bool is_new_follower =
false;
159 bool should_sign =
false;
161 std::shared_ptr<aft::State> state;
164 std::chrono::milliseconds request_timeout;
165 std::chrono::milliseconds election_timeout;
166 size_t max_uncommitted_tx_count;
167 bool ticking =
false;
170 std::list<Configuration> configurations;
178 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
179 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
182 std::shared_ptr<ccf::NodeClient> node_client;
185 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
187 std::shared_ptr<ccf::CommitCallbackSubsystem> commit_callbacks;
189 size_t entry_size_not_limited = 0;
190 size_t entry_count = 0;
191 Index entries_batch_size = 20;
192 static constexpr int batch_window_size = 100;
193 int batch_window_sum = 0;
197 bool public_only =
false;
200 std::uniform_int_distribution<int> distrib;
201 std::default_random_engine rand;
206 static constexpr size_t max_terms_per_append_entries = 1;
215 std::unique_ptr<Store> store_,
216 std::unique_ptr<LedgerProxy> ledger_,
217 std::shared_ptr<ccf::NodeToNode> channels_,
218 std::shared_ptr<aft::State> state_,
219 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
220 std::shared_ptr<ccf::CommitCallbackSubsystem>
221 commit_callbacks_subsystem_ =
nullptr,
222 bool public_only_ =
false) :
223 store(
std::move(store_)),
227 state(
std::move(state_)),
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
233 node_client(
std::move(rpc_request_context_)),
234 retired_node_cleanup(
235 std::make_unique<
ccf::RetiredNodeCleanup>(node_client)),
236 commit_callbacks(
std::move(commit_callbacks_subsystem_)),
238 public_only(public_only_),
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
246 if (commit_callbacks !=
nullptr)
248 commit_callbacks->set_consensus(
this);
261 return state->node_id;
276 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
277 return can_replicate_unsafe();
287 if (max_uncommitted_tx_count == 0)
291 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
293 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
298 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
299 if (can_sign_unsafe())
303 return Consensus::SignatureDisposition::SHOULD_SIGN;
305 return Consensus::SignatureDisposition::CAN_SIGN;
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
338 ccf::SeqNo seqno,
const std::vector<ccf::kv::NodeId>& node_ids)
override
340 for (
const auto& node_id : node_ids)
346 "Node is not retired, cannot become retired committed");
349 "Node is not retired, cannot become retired committed");
350 state->retired_committed_idx = seqno;
359 all_other_nodes.erase(node_id);
360 RAFT_INFO_FMT(
"Removed {} from nodes known to consensus", node_id);
367 return state->committable_indices.empty() ?
369 state->committable_indices.back();
377 const auto it = std::upper_bound(
378 state->committable_indices.rbegin(),
379 state->committable_indices.rend(),
381 [](
const auto& l,
const auto& r) { return l >= r; });
382 if (it == state->committable_indices.rend())
392 while (!state->committable_indices.empty() &&
393 (state->committable_indices.front() <= idx))
395 state->committable_indices.pop_front();
403 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
411 if (leader_id.has_value())
413 throw std::logic_error(
414 "Can't force leadership if there is already a leader");
417 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
418 state->current_view += starting_view_change;
425 const std::vector<Index>& terms,
426 Index commit_idx_)
override
430 if (leader_id.has_value())
432 throw std::logic_error(
433 "Can't force leadership if there is already a leader");
436 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
437 state->current_view = term;
438 state->last_idx = index;
439 state->commit_idx = commit_idx_;
440 state->view_history.initialise(terms);
441 state->view_history.update(index, term);
442 state->current_view += starting_view_change;
449 const std::vector<Index>& term_history,
450 Index recovery_start_index = 0)
override
454 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
456 state->last_idx = index;
457 state->commit_idx = index;
459 state->view_history.initialise(term_history);
461 ledger->init(index, recovery_start_index);
468 return state->last_idx;
473 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
474 return get_commit_idx_unsafe();
479 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
480 return state->current_view;
485 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
486 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
487 return {get_term_internal(commit_idx), commit_idx};
492 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
493 return get_term_internal(idx);
499 return state->view_history.get_history_until(idx);
505 return state->view_history.get_history_since(idx);
514 std::set<ccf::NodeId> nodes;
516 for (
auto const& conf : configurations)
518 for (
auto const& [node_id, _] : conf.nodes)
520 if (node_id != state->node_id)
522 nodes.insert(node_id);
531 Index idx,
const ccf::kv::Configuration::Nodes& conf)
override
534 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
536#ifdef CCF_RAFT_TRACING
537 nlohmann::json j = {};
538 j[
"function"] =
"add_configuration";
540 COMMITTABLE_INDICES(j[
"state"], state);
541 j[
"configurations"] = configurations;
542 j[
"args"] = nlohmann::json::object();
552 !configurations.empty() &&
553 configurations.back().nodes.find(state->node_id) !=
554 configurations.back().nodes.end() &&
555 conf.find(state->node_id) == conf.end())
560 if (configurations.empty() || conf != configurations.back().nodes)
563 configurations.push_back(new_config);
565 create_and_remove_node_state();
572 using namespace std::chrono_literals;
573 timeout_elapsed = 0ms;
579 for (
auto& node : all_other_nodes)
581 using namespace std::chrono_literals;
582 node.second.last_ack_timeout = 0ms;
588 if (configurations.empty())
593 return configurations.back().nodes;
598 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
615 for (
auto const& conf : configurations)
617 details.
configs.push_back(conf);
619 for (
auto& [k, v] : all_other_nodes)
622 v.match_idx,
static_cast<size_t>(v.last_ack_timeout.count())};
630 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
635 "Failed to replicate {} items: not leader", entries.size());
640 if (term != state->current_view)
643 "Failed to replicate {} items at term {}, current term is {}",
646 state->current_view);
653 "Failed to replicate {} items: node retirement is complete",
661 for (
const auto& [index, data, is_globally_committable, hooks] : entries)
663 bool globally_committable = is_globally_committable;
665 if (index != state->last_idx + 1)
671 "Replicated on leader {}: {}{} ({} hooks)",
674 (globally_committable ?
" committable" :
""),
677#ifdef CCF_RAFT_TRACING
678 nlohmann::json j = {};
679 j[
"function"] =
"replicate";
681 COMMITTABLE_INDICES(j[
"state"], state);
684 j[
"globally_committable"] = globally_committable;
688 for (
auto& hook : *hooks)
693 if (globally_committable)
696 "membership: {} leadership: {}",
697 state->membership_state,
698 state->leadership_state);
705 state->committable_indices.push_back(index);
706 start_ticking_if_necessary();
713 state->last_idx = index;
715 *data, globally_committable, state->current_view, index);
716 entry_size_not_limited += data->size();
719 state->view_history.update(index, state->current_view);
724 entry_size_not_limited = 0;
725 for (
const auto& it : all_other_nodes)
728 send_append_entries(it.first, it.second.sent_idx + 1);
743 const ccf::NodeId& from,
const uint8_t* data,
size_t size)
override
745 auto type = serialized::peek<RaftMsgType>(data, size);
754 channels->template recv_authenticated<AppendEntries>(
756 recv_append_entries(from, r, data, size);
763 channels->template recv_authenticated<AppendEntriesResponse>(
765 recv_append_entries_response(from, r);
772 channels->template recv_authenticated<RequestPreVote>(
774 recv_request_pre_vote(from, r);
782 recv_request_vote(from, r);
789 channels->template recv_authenticated<RequestPreVoteResponse>(
791 recv_request_pre_vote_response(from, r);
798 channels->template recv_authenticated<RequestVoteResponse>(
800 recv_request_vote_response(from, r);
807 channels->template recv_authenticated<ProposeRequestVote>(
809 recv_propose_request_vote(from, r);
816 RAFT_FAIL_FMT(
"Received unhandled AFT message type: {}", type);
831 catch (
const std::exception& e)
839 void periodic(std::chrono::milliseconds elapsed)
override
841 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
842 timeout_elapsed += elapsed;
846 if (timeout_elapsed >= request_timeout)
848 using namespace std::chrono_literals;
849 timeout_elapsed = 0ms;
853 for (
const auto& node : all_other_nodes)
855 send_append_entries(node.first, node.second.sent_idx + 1);
859 for (
auto& node : all_other_nodes)
861 node.second.last_ack_timeout += elapsed;
864 bool every_active_config_has_a_quorum = std::all_of(
865 configurations.begin(),
866 configurations.end(),
868 size_t live_nodes_in_config = 0;
869 for (auto const& node : conf.nodes)
871 auto search = all_other_nodes.find(node.first);
876 search == all_other_nodes.end() ||
878 search->second.last_ack_timeout < election_timeout)
880 ++live_nodes_in_config;
885 "No ack received from {} in last {}",
890 return live_nodes_in_config >= get_quorum(conf.
nodes.size());
893 if (!every_active_config_has_a_quorum)
900 "Stepping down as leader {}: No ack received from a majority of "
901 "backups in last {}",
911 timeout_elapsed >= election_timeout)
914 if (state->pre_vote_enabled)
916 become_pre_vote_candidate();
935 Index probe_index = std::min(tx_id.
seqno, state->last_idx);
936 Term term_of_probe = state->view_history.view_at(probe_index);
937 while (term_of_probe > tx_id.
view)
942 probe_index = state->view_history.start_of_view(term_of_probe);
947 term_of_probe = state->view_history.view_at(probe_index);
951 "Looking for match with {}.{}, from {}.{}, best answer is {}",
954 state->view_history.view_at(state->last_idx),
960 void update_batch_size()
962 auto avg_entry_size = (entry_count == 0) ?
963 append_entries_size_limit :
964 entry_size_not_limited / entry_count;
966 auto batch_size = (avg_entry_size == 0) ?
967 append_entries_size_limit / 2 :
968 append_entries_size_limit / avg_entry_size;
970 auto batch_avg = batch_window_sum / batch_window_size;
972 batch_window_sum += (batch_size - batch_avg);
973 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
978 if (idx > state->last_idx)
983 return state->view_history.view_at(idx);
986 bool can_replicate_unsafe()
989 !is_retired_committed();
992 bool can_sign_unsafe()
995 !is_retired_committed();
998 Index get_commit_idx_unsafe()
1000 return state->commit_idx;
1006 "Sending append entries to node {} in batches of {}, covering the "
1013 auto calculate_end_index = [
this](
Index start) {
1018 max_terms_per_append_entries == 1,
1019 "AppendEntries construction logic enforces single term");
1020 auto max_idx = state->last_idx;
1021 const auto term_of_ae = state->view_history.view_at(start);
1022 const auto index_at_end_of_term =
1023 state->view_history.end_of_view(term_of_ae);
1024 if (index_at_end_of_term != ccf::kv::NoVersion)
1026 max_idx = index_at_end_of_term;
1028 return std::min(start + entries_batch_size - 1, max_idx);
1037 end_idx = calculate_end_index(start_idx);
1038 RAFT_TRACE_FMT(
"Sending sub range {} -> {}", start_idx, end_idx);
1039 send_append_entries_range(to, start_idx, end_idx);
1040 start_idx = std::min(end_idx + 1, state->last_idx);
1041 }
while (end_idx != state->last_idx);
1044 void send_append_entries_range(
1047 const auto prev_idx = start_idx - 1;
1049 if (is_retired_committed() && start_idx >= end_idx)
1056 const auto prev_term = get_term_internal(prev_idx);
1057 const auto term_of_idx = get_term_internal(end_idx);
1059#pragma clang diagnostic push
1060#pragma clang diagnostic ignored "-Wc99-designator"
1063 {.idx = end_idx, .prev_idx = prev_idx},
1064 .term = state->current_view,
1065 .prev_term = prev_term,
1066 .leader_commit_idx = state->commit_idx,
1067 .term_of_idx = term_of_idx,
1069#pragma clang diagnostic pop
1072 "Send {} from {} to {}: ({}.{}, {}.{}] ({})",
1082 auto& node = all_other_nodes.at(to);
1084#ifdef CCF_RAFT_TRACING
1085 nlohmann::json j = {};
1086 j[
"function"] =
"send_append_entries";
1088 j[
"state"] = *state;
1089 COMMITTABLE_INDICES(j[
"state"], state);
1090 j[
"to_node_id"] = to;
1091 j[
"match_idx"] = node.match_idx;
1092 j[
"sent_idx"] = node.sent_idx;
1098 if (!channels->send_authenticated(
1105 node.sent_idx = end_idx;
1108 void recv_append_entries(
1111 const uint8_t* data,
1114 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1117 "Recv {} to {} from {}: {}.{} to {}.{} in term {}",
1127#ifdef CCF_RAFT_TRACING
1128 nlohmann::json j = {};
1129 j[
"function"] =
"recv_append_entries";
1131 j[
"state"] = *state;
1132 COMMITTABLE_INDICES(j[
"state"], state);
1133 j[
"from_node_id"] = from;
1144 state->current_view == r.term &&
1148 become_aware_of_new_term(r.term);
1150 else if (state->current_view < r.term)
1152 become_aware_of_new_term(r.term);
1154 else if (state->current_view > r.term)
1158 "Recv {} to {} from {} but our term is later ({} > {})",
1162 state->current_view,
1164 send_append_entries_response_nack(from);
1169 const auto prev_term = get_term_internal(r.prev_idx);
1170 if (prev_term != r.prev_term)
1173 "Previous term for {} should be {}", r.prev_idx, prev_term);
1180 "Recv {} to {} from {} but our log does not yet "
1186 send_append_entries_response_nack(from);
1191 "Recv {} to {} from {} but our log at {} has the wrong "
1192 "previous term (ours: {}, theirs: {})",
1199 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1200 send_append_entries_response_nack(from, rejected_tx);
1207 restart_election_timeout();
1208 if (!leader_id.has_value() || leader_id.value() != from)
1212 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1216 if (r.prev_idx < state->commit_idx)
1219 "Recv {} to {} from {} but prev_idx ({}) < commit_idx "
1232 if (r.prev_idx > state->last_idx)
1235 "Recv {} to {} from {} but prev_idx ({}) > last_idx ({})",
1245 "Recv {} to {} from {} for index {} and previous index {}",
1252 std::vector<std::tuple<
1253 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1257 for (
Index i = r.prev_idx + 1; i <= r.idx; i++)
1259 if (i <= state->last_idx)
1266 max_terms_per_append_entries == 1,
1267 "AppendEntries rollback logic assumes single term");
1268 const auto incoming_term = r.term_of_idx;
1269 const auto local_term = state->view_history.view_at(i);
1270 if (incoming_term != local_term)
1272 if (is_new_follower)
1274 auto rollback_level = i - 1;
1276 "New follower received AppendEntries with conflict. Incoming "
1277 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1284 "Dropping conflicting branch. Rolling back {} entries, "
1285 "beginning with {}.{}.",
1286 state->last_idx - rollback_level,
1289 rollback(rollback_level);
1290 is_new_follower =
false;
1302 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1303 "beginning with {}.{}.",
1304 state->last_idx - (i - 1),
1314 ledger->skip_entry(data, size);
1319 std::vector<uint8_t> entry;
1322 entry = LedgerProxy::get_entry(data, size);
1324 catch (
const std::logic_error& e)
1328 "Recv {} to {} from {} but the data is malformed: {}",
1333 send_append_entries_response_nack(from);
1338 auto ds = store->deserialize(entry, public_only, expected);
1342 "Recv {} to {} from {} but the entry could not be "
1347 send_append_entries_response_nack(from);
1351 append_entries.emplace_back(std::move(
ds), i);
1354 execute_append_entries_sync(std::move(append_entries), from, r);
1357 void execute_append_entries_sync(
1358 std::vector<std::tuple<
1359 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1362 const AppendEntries& r)
1364 for (
auto& ae : append_entries)
1367 RAFT_DEBUG_FMT(
"Replicating on follower {}: {}", state->node_id, i);
1369#ifdef CCF_RAFT_TRACING
1370 nlohmann::json j = {};
1371 j[
"function"] =
"execute_append_entries_sync";
1372 j[
"state"] = *state;
1373 COMMITTABLE_INDICES(j[
"state"], state);
1374 j[
"from_node_id"] = from;
1378 bool track_deletes_on_missing_keys =
false;
1380 ds->apply(track_deletes_on_missing_keys);
1383 ledger->truncate(i - 1);
1384 send_append_entries_response_nack(from);
1387 state->last_idx = i;
1389 for (
auto& hook :
ds->get_hooks())
1394 bool globally_committable =
1396 if (globally_committable)
1398 start_ticking_if_necessary();
1401 const auto& entry =
ds->get_entry();
1404 entry, globally_committable,
ds->get_term(),
ds->get_index());
1406 switch (apply_success)
1412 ledger->truncate(state->last_idx);
1413 send_append_entries_response_nack(from);
1426 state->committable_indices.push_back(i);
1428 if (
ds->get_term() != 0u)
1435 state->view_history.update(1, r.term);
1443 max_terms_per_append_entries == 1,
1444 "AppendEntries processing for term updates assumes single "
1446 state->view_history.update(r.prev_idx + 1,
ds->get_term());
1449 commit_if_possible(r.leader_commit_idx);
1466 throw std::logic_error(
"Unknown ApplyResult value");
1471 execute_append_entries_finish(r, from);
1474 void execute_append_entries_finish(
1479 commit_if_possible(r.leader_commit_idx);
1482 auto lci = last_committable_index();
1490 state->view_history.update(1, r.term);
1499 state->view_history.update(lci + 1, r.term_of_idx);
1503 send_append_entries_response_ack(from, r);
1506 void send_append_entries_response_ack(
1512 const auto response_idx = ae.idx;
1513 send_append_entries_response(
1517 void send_append_entries_response_nack(
1520 const auto response_idx = find_highest_possible_match(rejected);
1521 const auto response_term = get_term_internal(response_idx);
1523 send_append_entries_response(
1527 void send_append_entries_response_nack(
ccf::NodeId to)
1529 send_append_entries_response(
1532 state->current_view,
1536 void send_append_entries_response(
1542 AppendEntriesResponse response{
1543 .term = response_term,
1544 .last_log_idx = response_idx,
1549 "Send {} from {} to {} for index {}: {}",
1556#ifdef CCF_RAFT_TRACING
1557 nlohmann::json j = {};
1558 j[
"function"] =
"send_append_entries_response";
1559 j[
"packet"] = response;
1560 j[
"state"] = *state;
1561 COMMITTABLE_INDICES(j[
"state"], state);
1562 j[
"to_node_id"] = to;
1566 channels->send_authenticated(
1570 void recv_append_entries_response(
1573 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1575 auto node = all_other_nodes.find(from);
1576 if (node == all_other_nodes.end())
1580 "Recv append entries response to {} from {}: unknown node",
1586#ifdef CCF_RAFT_TRACING
1587 nlohmann::json j = {};
1588 j[
"function"] =
"recv_append_entries_response";
1590 j[
"state"] = *state;
1591 COMMITTABLE_INDICES(j[
"state"], state);
1592 j[
"from_node_id"] = from;
1593 j[
"match_idx"] = node->second.match_idx;
1594 j[
"sent_idx"] = node->second.sent_idx;
1602 "Recv {} to {} from {}: no longer leader",
1609 using namespace std::chrono_literals;
1610 node->second.last_ack_timeout = 0ms;
1612 if (state->current_view < r.term)
1616 "Recv {} to {} from {}: more recent term ({} "
1622 state->current_view);
1623 become_aware_of_new_term(r.term);
1626 if (state->current_view != r.term)
1636 "Recv {} to {} from {}: stale term ({} != {})",
1641 state->current_view);
1645 else if (r.last_log_idx < node->second.match_idx)
1655 "Recv {} to {} from {}: stale idx", r.msg, state->node_id, from);
1665 "Recv {} to {} from {}: failed", r.msg, state->node_id, from);
1666 const auto this_match =
1667 find_highest_possible_match({r.term, r.last_log_idx});
1668 node->second.sent_idx = std::max(
1669 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1675 node->second.match_idx = std::max(node->second.match_idx, r.last_log_idx);
1678 "Recv {} to {} from {} for index {}: success",
1688 auto last_committable_idx = last_committable_index();
1689 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1692 .term = state->current_view,
1693 .last_committable_idx = last_committable_idx,
1694 .term_of_last_committable_idx =
1695 get_term_internal(last_committable_idx)};
1697#ifdef CCF_RAFT_TRACING
1698 nlohmann::json j = {};
1699 j[
"function"] =
"send_request_vote";
1701 j[
"state"] = *state;
1702 COMMITTABLE_INDICES(j[
"state"], state);
1703 j[
"to_node_id"] = to;
1712 auto last_committable_idx = last_committable_index();
1713 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1716 .term = state->current_view,
1717 .last_committable_idx = last_committable_idx,
1718 .term_of_last_committable_idx =
1719 get_term_internal(last_committable_idx)};
1721#ifdef CCF_RAFT_TRACING
1722 nlohmann::json j = {};
1723 j[
"function"] =
"send_request_vote";
1725 j[
"state"] = *state;
1726 COMMITTABLE_INDICES(j[
"state"], state);
1727 j[
"to_node_id"] = to;
1734 void recv_request_vote_unsafe(
1744 if (state->current_view > r.term)
1748 "Recv {} to {} from {}: our term is later ({} > {})",
1752 state->current_view,
1754 send_request_vote_response(from,
false, election_type);
1757 if (state->current_view < r.term)
1760 "Recv {} to {} from {}: their term is later ({} < {})",
1764 state->current_view,
1771 become_aware_of_new_term(r.term);
1774 bool grant_vote =
true;
1780 "Recv {} to {} from {}: leader {} already known in term {}",
1785 state->current_view);
1789 auto voted_for_other =
1790 (voted_for.has_value()) && (voted_for.value() != from);
1795 "Recv {} to {} from {}: already voted for {}",
1806 const auto last_committable_idx = last_committable_index();
1807 const auto term_of_last_committable_idx =
1808 get_term_internal(last_committable_idx);
1809 const auto log_up_to_date =
1810 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1811 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1812 (r.last_committable_idx >= last_committable_idx));
1813 if (!log_up_to_date)
1816 "Recv {} to {} from {}: candidate log {}.{} is not up-to-date "
1821 r.term_of_last_committable_idx,
1822 r.last_committable_idx,
1823 term_of_last_committable_idx,
1824 last_committable_idx);
1831 restart_election_timeout();
1837 "Recv {} to {} from {}: {} vote to candidate at {}.{} with "
1838 "local state at {}.{}",
1842 grant_vote ?
"granted" :
"denied",
1843 r.term_of_last_committable_idx,
1844 r.last_committable_idx,
1845 term_of_last_committable_idx,
1846 last_committable_idx);
1848 send_request_vote_response(from, grant_vote, election_type);
1851 void recv_request_vote(
const ccf::NodeId& from, RequestVote r)
1853 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1855#ifdef CCF_RAFT_TRACING
1856 nlohmann::json j = {};
1857 j[
"function"] =
"recv_request_vote";
1859 j[
"state"] = *state;
1860 COMMITTABLE_INDICES(j[
"state"], state);
1861 j[
"from_node_id"] = from;
1868 void recv_request_pre_vote(
const ccf::NodeId& from, RequestPreVote r)
1870 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1872#ifdef CCF_RAFT_TRACING
1873 nlohmann::json j = {};
1874 j[
"function"] =
"recv_request_vote";
1876 j[
"state"] = *state;
1877 COMMITTABLE_INDICES(j[
"state"], state);
1878 j[
"from_node_id"] = from;
1886 .last_committable_idx = r.last_committable_idx,
1887 .term_of_last_committable_idx = r.term_of_last_committable_idx,
1893 void send_request_vote_response(
1898 RequestVoteResponse response{
1899 .term = state->current_view, .vote_granted = answer};
1902 "Send {} from {} to {}: {}",
1908 channels->send_authenticated(
1913 RequestPreVoteResponse response{
1914 .term = state->current_view, .vote_granted = answer};
1917 "Send {} from {} to {}: {}",
1923 channels->send_authenticated(
1928 void recv_request_vote_response(
1930 RequestVoteResponse r,
1933 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1935#ifdef CCF_RAFT_TRACING
1936 nlohmann::json j = {};
1937 j[
"function"] =
"recv_request_vote_response";
1939 j[
"state"] = *state;
1940 COMMITTABLE_INDICES(j[
"state"], state);
1941 j[
"from_node_id"] = from;
1946 auto node = all_other_nodes.find(from);
1947 if (node == all_other_nodes.end())
1950 "Recv {} to {} from {}: unknown node", r.msg, state->node_id, from);
1954 if (state->current_view < r.term)
1957 "Recv {} to {} from {}: their term is more recent "
1962 state->current_view,
1964 become_aware_of_new_term(r.term);
1967 if (state->current_view != r.term)
1971 "Recv request vote response to {} from {}: stale ({} != {})",
1974 state->current_view,
1984 "Recv {} to {} from: {}: we aren't a candidate",
1997 "Recv {} to {} from {}: no longer a candidate in {}",
2014 "Recv {} to {} from {}: unexpected message in {} when "
2020 state->current_view);
2024 if (!r.vote_granted)
2028 "Recv request vote response to {} from {}: they voted no",
2035 "Recv request vote response to {} from {}: they voted yes",
2039 add_vote_for_me(from);
2042 void recv_request_vote_response(
2048 void recv_request_pre_vote_response(
2049 const ccf::NodeId& from, RequestPreVoteResponse r)
2051 RequestVoteResponse rvr{.term = r.term, .vote_granted = r.vote_granted};
2056 void recv_propose_request_vote(
2059 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
2061#ifdef CCF_RAFT_TRACING
2062 nlohmann::json j = {};
2063 j[
"function"] =
"recv_propose_request_vote";
2065 j[
"state"] = *state;
2066 COMMITTABLE_INDICES(j[
"state"], state);
2067 j[
"from_node_id"] = from;
2070 if (!is_retired_committed() && ticking && r.term == state->current_view)
2073 "Becoming candidate early due to propose request vote from {}", from);
2078 RAFT_INFO_FMT(
"Ignoring propose request vote from {}", from);
2082 void restart_election_timeout()
2086 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
2089 void reset_votes_for_me()
2091 votes_for_me.clear();
2092 for (
auto const& conf : configurations)
2094 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
2095 votes_for_me[conf.idx].votes.clear();
2099 void become_pre_vote_candidate()
2101 if (configurations.empty())
2104 "Not becoming pre-vote candidate {} due to lack of a configuration.",
2112 reset_votes_for_me();
2113 restart_election_timeout();
2116 "Becoming pre-vote candidate {}: {}",
2118 state->current_view);
2120#ifdef CCF_RAFT_TRACING
2121 nlohmann::json j = {};
2122 j[
"function"] =
"become_pre_vote_candidate";
2123 j[
"state"] = *state;
2124 COMMITTABLE_INDICES(j[
"state"], state);
2125 j[
"configurations"] = configurations;
2129 add_vote_for_me(state->node_id);
2133 for (
auto const& node_id : other_nodes_in_active_configs())
2136 send_request_pre_vote(node_id);
2141 void become_candidate()
2143 if (configurations.empty())
2149 "Not becoming candidate {} due to lack of a configuration.",
2157 voted_for = state->node_id;
2158 reset_votes_for_me();
2159 state->current_view++;
2161 restart_election_timeout();
2162 reset_last_ack_timeouts();
2165 "Becoming candidate {}: {}", state->node_id, state->current_view);
2167#ifdef CCF_RAFT_TRACING
2168 nlohmann::json j = {};
2169 j[
"function"] =
"become_candidate";
2170 j[
"state"] = *state;
2171 COMMITTABLE_INDICES(j[
"state"], state);
2172 j[
"configurations"] = configurations;
2176 add_vote_for_me(state->node_id);
2180 for (
auto const& node_id : other_nodes_in_active_configs())
2183 send_request_vote(node_id);
2187 void become_leader(
bool =
false)
2189 if (is_retired_committed())
2194 const auto election_index = last_committable_index();
2197 "Election index is {} in term {}", election_index, state->current_view);
2201 if (state->commit_idx > 0)
2203 rollback(election_index);
2208 store->initialise_term(state->current_view);
2212 leader_id = state->node_id;
2215 using namespace std::chrono_literals;
2216 timeout_elapsed = 0ms;
2218 reset_last_ack_timeouts();
2221 "Becoming leader {}: {}", state->node_id, state->current_view);
2223#ifdef CCF_RAFT_TRACING
2224 nlohmann::json j = {};
2225 j[
"function"] =
"become_leader";
2226 j[
"state"] = *state;
2227 COMMITTABLE_INDICES(j[
"state"], state);
2228 j[
"configurations"] = configurations;
2233 if (other_nodes_in_active_configs().size() == 0)
2239 auto next = state->last_idx + 1;
2241 for (
auto& node : all_other_nodes)
2243 node.second.match_idx = 0;
2244 node.second.sent_idx = next - 1;
2247 send_append_entries(node.first, next);
2250 if (retired_node_cleanup)
2252 retired_node_cleanup->cleanup();
2262 restart_election_timeout();
2263 reset_last_ack_timeouts();
2267 "Becoming follower {}: {}.{}",
2269 state->current_view,
2272#ifdef CCF_RAFT_TRACING
2273 nlohmann::json j = {};
2274 j[
"function"] =
"become_follower";
2275 j[
"state"] = *state;
2276 COMMITTABLE_INDICES(j[
"state"], state);
2277 j[
"configurations"] = configurations;
2289 state->current_view = term;
2291 reset_votes_for_me();
2293 is_new_follower =
true;
2297 void send_propose_request_vote()
2301 std::optional<ccf::NodeId> successor = std::nullopt;
2302 Index max_match_idx = 0;
2315 for (
auto& [node, node_state] : all_other_nodes)
2317 if (node_state.match_idx >= max_match_idx)
2320 auto conf = configurations.rbegin();
2321 while (conf != configurations.rend())
2323 if (conf->nodes.find(node) != conf->nodes.end())
2325 latest_reconf_id = conf->idx;
2330 if (!(node_state.match_idx == max_match_idx &&
2331 latest_reconf_id < reconf_id_of_max_match))
2333 reconf_id_of_max_match = latest_reconf_id;
2335 max_match_idx = node_state.match_idx;
2339 if (successor.has_value())
2341 RAFT_INFO_FMT(
"Proposing that {} becomes candidate", successor.value());
2342 channels->send_authenticated(
2349 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2351 state->leadership_state,
2353 state->current_view,
2359 !state->retirement_idx.has_value(),
2360 "retirement_idx already set to {}",
2362 state->retirement_idx.value());
2363 state->retirement_idx = idx;
2368 assert(state->retirement_idx.has_value());
2371 idx >= state->retirement_idx.value(),
2372 "Index {} unexpectedly lower than retirement_idx {}",
2375 state->retirement_idx.value());
2376 state->retirement_committable_idx = idx;
2383 send_propose_request_vote();
2391 state->retirement_phase = phase;
2396 if (configurations.empty())
2399 "Not voting for myself {} due to lack of a configuration.",
2405 for (
auto const& conf : configurations)
2407 auto const&
nodes = conf.nodes;
2414 votes_for_me[conf.idx].votes.insert(from);
2416 "Node {} voted for {} in configuration {} with quorum {}",
2420 votes_for_me[conf.idx].quorum);
2424 bool is_elected =
true;
2425 for (
auto const& v : votes_for_me)
2427 auto const& quorum = v.second.quorum;
2428 auto const& votes = v.second.votes;
2430 if (votes.size() < quorum)
2439 switch (state->leadership_state)
2450 throw std::logic_error(
2451 "add_vote_for_me() called while not a pre-vote candidate or "
2460 void update_commit()
2464 throw std::logic_error(
2465 "update_commit() must only be called while this node is leader");
2468 std::optional<Index> new_agreement_index = std::nullopt;
2471 for (
auto const& c : configurations)
2475 std::vector<Index> match;
2476 match.reserve(c.nodes.size());
2478 for (
const auto& node : c.
nodes)
2480 if (node.first == state->node_id)
2482 match.push_back(state->last_idx);
2486 match.push_back(all_other_nodes.at(node.first).match_idx);
2490 sort(match.begin(), match.end());
2491 auto confirmed = match.at((match.size() - 1) / 2);
2494 !new_agreement_index.has_value() ||
2495 confirmed < new_agreement_index.value())
2497 new_agreement_index = confirmed;
2501 if (new_agreement_index.has_value())
2503 if (new_agreement_index.value() > state->last_idx)
2505 throw std::logic_error(
2506 "Followers appear to have later match indices than leader");
2509 const auto new_commit_idx =
2510 find_highest_possible_committable_index(new_agreement_index.value());
2512 if (new_commit_idx.has_value())
2515 "In update_commit, new_commit_idx: {}, "
2517 new_commit_idx.value(),
2520 const auto term_of_new = get_term_internal(new_commit_idx.value());
2521 if (term_of_new == state->current_view)
2523 commit(new_commit_idx.value());
2528 "Ack quorum at {} resulted in proposed commit index {}, which "
2529 "is in term {}. Waiting for agreement on committable entry in "
2530 "current term {} to update commit",
2531 new_agreement_index.value(),
2532 new_commit_idx.value(),
2534 state->current_view);
2542 void commit_if_possible(
Index idx)
2545 "Commit if possible {} (ci: {}) (ti {})",
2548 get_term_internal(idx));
2550 (idx > state->commit_idx) &&
2551 (get_term_internal(idx) <= state->current_view))
2553 const auto highest_committable =
2554 find_highest_possible_committable_index(idx);
2555 if (highest_committable.has_value())
2557 commit(highest_committable.value());
2562 size_t get_quorum(
size_t n)
const
2567 void commit(
Index idx)
2569 if (idx > state->last_idx)
2571 throw std::logic_error(fmt::format(
2572 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2579 if (idx <= state->commit_idx)
2584#ifdef CCF_RAFT_TRACING
2585 nlohmann::json j = {};
2586 j[
"function"] =
"commit";
2587 j[
"args"] = nlohmann::json::object();
2588 j[
"args"][
"idx"] = idx;
2589 j[
"state"] = *state;
2590 COMMITTABLE_INDICES(j[
"state"], state);
2591 j[
"configurations"] = configurations;
2595 compact_committable_indices(idx);
2597 state->commit_idx = idx;
2601 state->retirement_committable_idx.has_value())
2603 const auto retirement_committable =
2605 ->retirement_committable_idx.value();
2606 if (idx >= retirement_committable)
2613 store->compact(idx);
2614 ledger->commit(idx);
2616 if (commit_callbacks !=
nullptr)
2618 const auto term = get_term_internal(idx);
2619 commit_callbacks->trigger_callbacks({term, idx}, state->view_history);
2626 bool changed =
false;
2630 auto conf = configurations.begin();
2631 if (conf == configurations.end())
2636 auto next = std::next(conf);
2637 if (next == configurations.end())
2642 if (idx < next->idx)
2648 "Configurations: discard committed configuration at {}", conf->idx);
2649 configurations.pop_front();
2655 create_and_remove_node_state();
2656 if (retired_node_cleanup && is_primary())
2658 retired_node_cleanup->cleanup();
2663 bool is_self_in_latest_config()
2665 bool present =
false;
2666 if (!configurations.empty())
2668 auto current_nodes = configurations.back().nodes;
2669 present = current_nodes.find(state->node_id) != current_nodes.end();
2674 void start_ticking_if_necessary()
2676 if (!ticking && is_self_in_latest_config())
2685 if (idx < state->commit_idx)
2688 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2689 "ignoring rollback request",
2695 store->rollback({get_term_internal(idx), idx}, state->current_view);
2697 RAFT_DEBUG_FMT(
"Setting term in store to: {}", state->current_view);
2698 ledger->truncate(idx);
2699 state->last_idx = idx;
2702 state->view_history.rollback(idx);
2704 while (!state->committable_indices.empty() &&
2705 (state->committable_indices.back() > idx))
2707 state->committable_indices.pop_back();
2714 assert(state->retirement_committable_idx.has_value());
2715 if (state->retirement_committable_idx.has_value())
2717 const auto retirement_committable =
2719 ->retirement_committable_idx.value();
2720 if (retirement_committable > idx)
2722 state->retirement_committable_idx = std::nullopt;
2732 assert(state->retirement_idx.has_value());
2733 if (state->retirement_idx.has_value())
2735 const auto retirement =
2737 ->retirement_idx.value();
2738 if (retirement > idx)
2740 state->retirement_idx = std::nullopt;
2741 state->retirement_phase = std::nullopt;
2749 bool changed =
false;
2751 while (!configurations.empty() && (configurations.back().idx > idx))
2754 "Configurations: rollback configuration at {}",
2755 configurations.back().idx);
2756 configurations.pop_back();
2762 create_and_remove_node_state();
2776 "Not proposing request vote from {} since not leader",
2781 LOG_INFO_FMT(
"Nominating successor for {}", state->node_id);
2783#ifdef CCF_RAFT_TRACING
2784 nlohmann::json j = {};
2785 j[
"function"] =
"step_down_and_nominate_successor";
2786 j[
"state"] = *state;
2787 COMMITTABLE_INDICES(j[
"state"], state);
2788 j[
"configurations"] = configurations;
2792 send_propose_request_vote();
2796 void create_and_remove_node_state()
2801 for (
auto const& conf : configurations)
2803 for (
auto const& node : conf.nodes)
2805 active_nodes.emplace(node.first, node.second);
2810 for (
auto node_info : active_nodes)
2812 if (node_info.first == state->node_id)
2817 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2819 if (!channels->have_channel(node_info.first))
2822 "Configurations: create node channel with {}", node_info.first);
2824 channels->associate_node_address(
2826 node_info.second.hostname,
2827 node_info.second.port);
2832 auto index = state->last_idx + 1;
2833 all_other_nodes.try_emplace(
2834 node_info.first, node_info.second, index, 0);
2838 send_append_entries(node_info.first, index);
2842 "Added raft node {} ({}:{})",
2844 node_info.second.hostname,
2845 node_info.second.port);