109 std::chrono::milliseconds last_ack_timeout;
111 NodeState() =
default;
116 Index match_idx_ = 0) :
117 node_info(node_info_),
119 match_idx(match_idx_),
125 std::unique_ptr<Store> store;
128 std::optional<ccf::NodeId> voted_for = std::nullopt;
129 std::optional<ccf::NodeId> leader_id = std::nullopt;
134 std::unordered_set<ccf::NodeId> votes;
137 std::map<Index, Votes> votes_for_me;
139 std::chrono::milliseconds timeout_elapsed;
150 bool is_new_follower =
false;
157 bool should_sign =
false;
159 std::shared_ptr<aft::State> state;
162 std::chrono::milliseconds request_timeout;
163 std::chrono::milliseconds election_timeout;
164 size_t max_uncommitted_tx_count;
165 bool ticking =
false;
168 std::list<Configuration> configurations;
176 std::unordered_map<ccf::NodeId, NodeState> all_other_nodes;
177 std::unordered_map<ccf::NodeId, ccf::SeqNo> retired_nodes;
181 std::shared_ptr<ccf::NodeClient> node_client;
184 std::unique_ptr<ccf::RetiredNodeCleanup> retired_node_cleanup;
186 size_t entry_size_not_limited = 0;
187 size_t entry_count = 0;
188 Index entries_batch_size = 20;
189 static constexpr int batch_window_size = 100;
190 int batch_window_sum = 0;
194 bool public_only =
false;
197 std::uniform_int_distribution<int> distrib;
198 std::default_random_engine rand;
203 static constexpr size_t max_terms_per_append_entries = 1;
213 std::unique_ptr<Store> store_,
214 std::unique_ptr<LedgerProxy> ledger_,
215 std::shared_ptr<ccf::NodeToNode> channels_,
216 std::shared_ptr<aft::State> state_,
217 std::shared_ptr<ccf::NodeClient> rpc_request_context_,
218 bool public_only_ =
false,
223 store(
std::move(store_)),
229 request_timeout(settings_.message_timeout),
230 election_timeout(settings_.election_timeout),
231 max_uncommitted_tx_count(settings_.max_uncommitted_tx_count),
233 reconfiguration_type(reconfiguration_type_),
234 node_client(rpc_request_context_),
235 retired_node_cleanup(
236 std::make_unique<
ccf::RetiredNodeCleanup>(node_client)),
238 public_only(public_only_),
240 distrib(0, (int)election_timeout.count() / 2),
241 rand((int)(uintptr_t)this),
256 return state->node_id;
271 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
272 return can_replicate_unsafe();
282 if (max_uncommitted_tx_count == 0)
286 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
288 (state->last_idx - state->commit_idx >= max_uncommitted_tx_count);
293 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
294 if (can_sign_unsafe())
298 return Consensus::SignatureDisposition::SHOULD_SIGN;
302 return Consensus::SignatureDisposition::CAN_SIGN;
307 return Consensus::SignatureDisposition::CANT_REPLICATE;
339 ccf::SeqNo seqno,
const std::vector<ccf::kv::NodeId>& node_ids)
override
341 for (
auto& node_id : node_ids)
347 "Node is not retired, cannot become retired committed");
350 "Node is not retired, cannot become retired committed");
351 state->retired_committed_idx = seqno;
360 all_other_nodes.erase(node_id);
361 RAFT_INFO_FMT(
"Removed {} from nodes known to consensus", node_id);
368 return state->committable_indices.empty() ?
370 state->committable_indices.back();
378 const auto it = std::upper_bound(
379 state->committable_indices.rbegin(),
380 state->committable_indices.rend(),
382 [](
const auto& l,
const auto& r) { return l >= r; });
383 if (it == state->committable_indices.rend())
393 while (!state->committable_indices.empty() &&
394 (state->committable_indices.front() <= idx))
396 state->committable_indices.pop_front();
404 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
412 if (leader_id.has_value())
414 throw std::logic_error(
415 "Can't force leadership if there is already a leader");
418 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
419 state->current_view += starting_view_change;
426 const std::vector<Index>& terms,
427 Index commit_idx_)
override
431 if (leader_id.has_value())
433 throw std::logic_error(
434 "Can't force leadership if there is already a leader");
437 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
438 state->current_view = term;
439 state->last_idx = index;
440 state->commit_idx = commit_idx_;
441 state->view_history.initialise(terms);
442 state->view_history.update(index, term);
443 state->current_view += starting_view_change;
450 const std::vector<Index>& term_history,
451 Index recovery_start_index = 0)
override
455 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
457 state->last_idx = index;
458 state->commit_idx = index;
460 state->view_history.initialise(term_history);
462 ledger->init(index, recovery_start_index);
469 return state->last_idx;
474 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
475 return get_commit_idx_unsafe();
480 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
481 return state->current_view;
486 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
487 ccf::SeqNo commit_idx = get_commit_idx_unsafe();
488 return {get_term_internal(commit_idx), commit_idx};
493 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
494 return get_term_internal(idx);
500 return state->view_history.get_history_until(idx);
506 return state->view_history.get_history_since(idx);
515 std::set<ccf::NodeId> nodes;
517 for (
auto const& conf : configurations)
519 for (
auto const& [node_id, _] : conf.nodes)
521 if (node_id != state->node_id)
523 nodes.insert(node_id);
535 const std::unordered_set<ccf::NodeId>& new_learner_nodes = {},
536 const std::unordered_set<ccf::NodeId>& new_retired_nodes = {})
override
539 "Configurations: add new configuration at {}: {{{}}}", idx, conf);
541 assert(new_learner_nodes.empty());
543#ifdef CCF_RAFT_TRACING
544 nlohmann::json j = {};
545 j[
"function"] =
"add_configuration";
547 COMMITTABLE_INDICES(j[
"state"], state);
548 j[
"configurations"] = configurations;
549 j[
"args"] = nlohmann::json::object();
559 !configurations.empty() &&
560 configurations.back().nodes.find(state->node_id) !=
561 configurations.back().nodes.end() &&
562 conf.find(state->node_id) == conf.end())
567 if (conf != configurations.back().nodes)
570 configurations.push_back(new_config);
572 create_and_remove_node_state();
579 using namespace std::chrono_literals;
580 timeout_elapsed = 0ms;
586 for (
auto& node : all_other_nodes)
588 using namespace std::chrono_literals;
589 node.second.last_ack_timeout = 0ms;
595 if (configurations.empty())
600 return configurations.back().nodes;
605 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
612 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
622 for (
auto const& conf : configurations)
624 details.
configs.push_back(conf);
626 for (
auto& [k, v] : all_other_nodes)
629 v.match_idx,
static_cast<size_t>(v.last_ack_timeout.count())};
637 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
642 "Failed to replicate {} items: not leader", entries.size());
647 if (term != state->current_view)
650 "Failed to replicate {} items at term {}, current term is {}",
653 state->current_view);
660 "Failed to replicate {} items: node retirement is complete",
668 for (
auto& [index, data, is_globally_committable, hooks] : entries)
670 bool globally_committable = is_globally_committable;
672 if (index != state->last_idx + 1)
676 "Replicated on leader {}: {}{} ({} hooks)",
679 (globally_committable ?
" committable" :
""),
682#ifdef CCF_RAFT_TRACING
683 nlohmann::json j = {};
684 j[
"function"] =
"replicate";
686 COMMITTABLE_INDICES(j[
"state"], state);
689 j[
"globally_committable"] = globally_committable;
693 for (
auto& hook : *hooks)
698 if (globally_committable)
701 "membership: {} leadership: {}",
702 state->membership_state,
703 state->leadership_state);
710 state->committable_indices.push_back(index);
711 start_ticking_if_necessary();
718 state->last_idx = index;
720 *data, globally_committable, state->current_view, index);
721 entry_size_not_limited += data->size();
724 state->view_history.update(index, state->current_view);
729 entry_size_not_limited = 0;
730 for (
const auto& it : all_other_nodes)
733 send_append_entries(it.first, it.second.sent_idx + 1);
748 const ccf::NodeId& from,
const uint8_t* data,
size_t size)
override
750 RaftMsgType type = serialized::peek<RaftMsgType>(data, size);
759 channels->template recv_authenticated<AppendEntries>(
761 recv_append_entries(from, r, data, size);
768 channels->template recv_authenticated<AppendEntriesResponse>(
770 recv_append_entries_response(from, r);
778 recv_request_vote(from, r);
785 channels->template recv_authenticated<RequestVoteResponse>(
787 recv_request_vote_response(from, r);
794 channels->template recv_authenticated<ProposeRequestVote>(
796 recv_propose_request_vote(from, r);
816 catch (
const std::exception& e)
823 void periodic(std::chrono::milliseconds elapsed)
override
825 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
826 timeout_elapsed += elapsed;
830 if (timeout_elapsed >= request_timeout)
832 using namespace std::chrono_literals;
833 timeout_elapsed = 0ms;
837 for (
const auto& node : all_other_nodes)
839 send_append_entries(node.first, node.second.sent_idx + 1);
843 for (
auto& node : all_other_nodes)
845 node.second.last_ack_timeout += elapsed;
848 bool has_quorum_of_backups =
false;
849 for (
auto const& conf : configurations)
851 size_t backup_ack_timeout_count = 0;
852 for (
auto const& node : conf.nodes)
854 auto search = all_other_nodes.find(node.first);
855 if (search == all_other_nodes.end())
860 if (search->second.last_ack_timeout >= election_timeout)
863 "No ack received from {} in last {}",
866 backup_ack_timeout_count++;
870 if (backup_ack_timeout_count < get_quorum(conf.nodes.size() - 1))
874 has_quorum_of_backups =
true;
879 if (!has_quorum_of_backups)
886 "Stepping down as leader {}: No ack received from a majority of "
887 "backups in last {}",
897 timeout_elapsed >= election_timeout)
914 Index probe_index = std::min(tx_id.
seqno, state->last_idx);
915 Term term_of_probe = state->view_history.view_at(probe_index);
916 while (term_of_probe > tx_id.
view)
921 probe_index = state->view_history.start_of_view(term_of_probe);
926 term_of_probe = state->view_history.view_at(probe_index);
930 "Looking for match with {}.{}, from {}.{}, best answer is {}",
933 state->view_history.view_at(state->last_idx),
939 inline void update_batch_size()
941 auto avg_entry_size = (entry_count == 0) ?
943 entry_size_not_limited / entry_count;
945 auto batch_size = (avg_entry_size == 0) ?
949 auto batch_avg = batch_window_sum / batch_window_size;
951 batch_window_sum += (batch_size - batch_avg);
952 entries_batch_size = std::max((batch_window_sum / batch_window_size), 1);
957 if (idx > state->last_idx)
960 return state->view_history.view_at(idx);
963 bool can_replicate_unsafe()
969 bool can_sign_unsafe()
975 Index get_commit_idx_unsafe()
977 return state->commit_idx;
983 "Sending append entries to node {} in batches of {}, covering the "
990 auto calculate_end_index = [
this](
Index start) {
995 max_terms_per_append_entries == 1,
996 "AppendEntries construction logic enforces single term");
997 auto max_idx = state->last_idx;
998 const auto term_of_ae = state->view_history.view_at(start);
999 const auto index_at_end_of_term =
1000 state->view_history.end_of_view(term_of_ae);
1001 if (index_at_end_of_term != ccf::kv::NoVersion)
1003 max_idx = index_at_end_of_term;
1005 return std::min(start + entries_batch_size - 1, max_idx);
1014 end_idx = calculate_end_index(start_idx);
1015 RAFT_TRACE_FMT(
"Sending sub range {} -> {}", start_idx, end_idx);
1016 send_append_entries_range(to, start_idx, end_idx);
1017 start_idx = std::min(end_idx + 1, state->last_idx);
1018 }
while (end_idx != state->last_idx);
1021 void send_append_entries_range(
1024 const auto prev_idx = start_idx - 1;
1033 const auto prev_term = get_term_internal(prev_idx);
1034 const auto term_of_idx = get_term_internal(end_idx);
1037 "Send append entries from {} to {}: ({}.{}, {}.{}] ({})",
1046#pragma clang diagnostic push
1047#pragma clang diagnostic ignored "-Wc99-designator"
1050 {.idx = end_idx, .prev_idx = prev_idx},
1051 .term = state->current_view,
1052 .prev_term = prev_term,
1053 .leader_commit_idx = state->commit_idx,
1054 .term_of_idx = term_of_idx,
1056#pragma clang diagnostic pop
1058 auto& node = all_other_nodes.at(to);
1060#ifdef CCF_RAFT_TRACING
1061 nlohmann::json j = {};
1062 j[
"function"] =
"send_append_entries";
1064 j[
"state"] = *state;
1065 COMMITTABLE_INDICES(j[
"state"], state);
1066 j[
"to_node_id"] = to;
1067 j[
"match_idx"] = node.match_idx;
1068 j[
"sent_idx"] = node.sent_idx;
1081 node.sent_idx = end_idx;
1084 void recv_append_entries(
1087 const uint8_t* data,
1090 std::unique_lock<ccf::pal::Mutex> guard(state->lock);
1093 "Received append entries: {}.{} to {}.{} (from {} in term {})",
1101#ifdef CCF_RAFT_TRACING
1102 nlohmann::json j = {};
1103 j[
"function"] =
"recv_append_entries";
1105 j[
"state"] = *state;
1106 COMMITTABLE_INDICES(j[
"state"], state);
1107 j[
"from_node_id"] = from;
1118 state->current_view == r.term &&
1123 else if (state->current_view < r.term)
1127 else if (state->current_view > r.term)
1131 "Recv append entries to {} from {} but our term is later ({} > {})",
1134 state->current_view,
1136 send_append_entries_response_nack(from);
1141 const auto prev_term = get_term_internal(r.prev_idx);
1142 if (prev_term != r.prev_term)
1145 "Previous term for {} should be {}", r.prev_idx, prev_term);
1152 "Recv append entries to {} from {} but our log does not yet "
1157 send_append_entries_response_nack(from);
1162 "Recv append entries to {} from {} but our log at {} has the wrong "
1163 "previous term (ours: {}, theirs: {})",
1169 const ccf::TxID rejected_tx{r.prev_term, r.prev_idx};
1170 send_append_entries_response_nack(from, rejected_tx);
1177 restart_election_timeout();
1178 if (!leader_id.has_value() || leader_id.value() != from)
1182 "Node {} thinks leader is {}", state->node_id, leader_id.value());
1186 if (r.prev_idx < state->commit_idx)
1189 "Recv append entries to {} from {} but prev_idx ({}) < commit_idx "
1201 else if (r.prev_idx > state->last_idx)
1204 "Recv append entries to {} from {} but prev_idx ({}) > last_idx ({})",
1213 "Recv append entries to {} from {} for index {} and previous index {}",
1219 std::vector<std::tuple<
1220 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1224 for (
Index i = r.prev_idx + 1; i <= r.idx; i++)
1226 if (i <= state->last_idx)
1233 max_terms_per_append_entries == 1,
1234 "AppendEntries rollback logic assumes single term");
1235 const auto incoming_term = r.term_of_idx;
1236 const auto local_term = state->view_history.view_at(i);
1237 if (incoming_term != local_term)
1239 if (is_new_follower)
1241 auto rollback_level = i - 1;
1243 "New follower received AppendEntries with conflict. Incoming "
1244 "entry {}.{} conflicts with local {}.{}. Rolling back to {}.",
1251 "Dropping conflicting branch. Rolling back {} entries, "
1252 "beginning with {}.{}.",
1253 state->last_idx - rollback_level,
1257 is_new_follower =
false;
1269 "Ignoring conflicting AppendEntries. Retaining {} entries, "
1270 "beginning with {}.{}.",
1271 state->last_idx - (i - 1),
1281 ledger->skip_entry(data, size);
1286 std::vector<uint8_t> entry;
1289 entry = LedgerProxy::get_entry(data, size);
1291 catch (
const std::logic_error& e)
1295 "Recv append entries to {} from {} but the data is malformed: {}",
1299 send_append_entries_response_nack(from);
1304 auto ds = store->deserialize(entry, public_only, expected);
1308 "Recv append entries to {} from {} but the entry could not be "
1312 send_append_entries_response_nack(from);
1316 append_entries.push_back(std::make_tuple(std::move(
ds), i));
1319 execute_append_entries_sync(
1320 std::move(append_entries), from, std::move(r));
1323 void execute_append_entries_sync(
1324 std::vector<std::tuple<
1325 std::unique_ptr<ccf::kv::AbstractExecutionWrapper>,
1330 for (
auto& ae : append_entries)
1333 RAFT_DEBUG_FMT(
"Replicating on follower {}: {}", state->node_id, i);
1335#ifdef CCF_RAFT_TRACING
1336 nlohmann::json j = {};
1337 j[
"function"] =
"execute_append_entries_sync";
1338 j[
"state"] = *state;
1339 COMMITTABLE_INDICES(j[
"state"], state);
1340 j[
"from_node_id"] = from;
1344 bool track_deletes_on_missing_keys =
false;
1346 ds->apply(track_deletes_on_missing_keys);
1350 send_append_entries_response_nack(from);
1353 state->last_idx = i;
1355 for (
auto& hook :
ds->get_hooks())
1360 bool globally_committable =
1362 if (globally_committable)
1364 start_ticking_if_necessary();
1367 const auto& entry =
ds->get_entry();
1370 entry, globally_committable,
ds->get_term(),
ds->get_index());
1372 switch (apply_success)
1378 ledger->truncate(state->last_idx);
1379 send_append_entries_response_nack(from);
1392 state->committable_indices.push_back(i);
1401 state->view_history.update(1, r.term);
1409 max_terms_per_append_entries == 1,
1410 "AppendEntries processing for term updates assumes single "
1412 state->view_history.update(r.prev_idx + 1,
ds->get_term());
1415 commit_if_possible(r.leader_commit_idx);
1432 throw std::logic_error(
"Unknown ApplyResult value");
1437 execute_append_entries_finish(r, from);
1440 void execute_append_entries_finish(
1445 commit_if_possible(r.leader_commit_idx);
1456 state->view_history.update(1, r.term);
1465 state->view_history.update(lci + 1, r.term_of_idx);
1469 send_append_entries_response_ack(from, r);
1472 void send_append_entries_response_ack(
1478 const auto response_idx = ae.idx;
1479 send_append_entries_response(
1483 void send_append_entries_response_nack(
1486 const auto response_idx = find_highest_possible_match(rejected);
1487 const auto response_term = get_term_internal(response_idx);
1489 send_append_entries_response(
1493 void send_append_entries_response_nack(
ccf::NodeId to)
1495 send_append_entries_response(
1498 state->current_view,
1502 void send_append_entries_response(
1509 "Send append entries response from {} to {} for index {}: {}",
1515 AppendEntriesResponse response{
1516 .term = response_term,
1517 .last_log_idx = response_idx,
1521#ifdef CCF_RAFT_TRACING
1522 nlohmann::json j = {};
1523 j[
"function"] =
"send_append_entries_response";
1524 j[
"packet"] = response;
1525 j[
"state"] = *state;
1526 COMMITTABLE_INDICES(j[
"state"], state);
1527 j[
"to_node_id"] = to;
1535 void recv_append_entries_response(
1538 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1540 auto node = all_other_nodes.find(from);
1541 if (node == all_other_nodes.end())
1545 "Recv append entries response to {} from {}: unknown node",
1551#ifdef CCF_RAFT_TRACING
1552 nlohmann::json j = {};
1553 j[
"function"] =
"recv_append_entries_response";
1555 j[
"state"] = *state;
1556 COMMITTABLE_INDICES(j[
"state"], state);
1557 j[
"from_node_id"] = from;
1558 j[
"match_idx"] = node->second.match_idx;
1559 j[
"sent_idx"] = node->second.sent_idx;
1567 "Recv append entries response to {} from {}: no longer leader",
1573 using namespace std::chrono_literals;
1574 node->second.last_ack_timeout = 0ms;
1576 if (state->current_view < r.term)
1580 "Recv append entries response to {} from {}: more recent term ({} "
1585 state->current_view);
1589 else if (state->current_view != r.term)
1599 "Recv append entries response to {} from {}: stale term ({} != {})",
1603 state->current_view);
1607 else if (r.last_log_idx < node->second.match_idx)
1617 "Recv append entries response to {} from {}: stale idx",
1629 "Recv append entries response to {} from {}: failed",
1632 const auto this_match =
1633 find_highest_possible_match({r.term, r.last_log_idx});
1634 node->second.sent_idx = std::max(
1635 std::min(this_match, node->second.sent_idx), node->second.match_idx);
1642 node->second.match_idx =
1643 std::max(node->second.match_idx, r.last_log_idx);
1647 "Recv append entries response to {} from {} for index {}: success",
1658 "Send request vote from {} to {} at {}",
1661 last_committable_idx);
1662 CCF_ASSERT(last_committable_idx >= state->commit_idx,
"lci < ci");
1665 .term = state->current_view,
1666 .last_committable_idx = last_committable_idx,
1667 .term_of_last_committable_idx = get_term_internal(last_committable_idx),
1670#ifdef CCF_RAFT_TRACING
1671 nlohmann::json j = {};
1672 j[
"function"] =
"send_request_vote";
1674 j[
"state"] = *state;
1675 COMMITTABLE_INDICES(j[
"state"], state);
1676 j[
"to_node_id"] = to;
1683 void recv_request_vote(
const ccf::NodeId& from, RequestVote r)
1685 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1694#ifdef CCF_RAFT_TRACING
1695 nlohmann::json j = {};
1696 j[
"function"] =
"recv_request_vote";
1698 j[
"state"] = *state;
1699 COMMITTABLE_INDICES(j[
"state"], state);
1700 j[
"from_node_id"] = from;
1704 if (state->current_view > r.term)
1708 "Recv request vote to {} from {}: our term is later ({} > {})",
1711 state->current_view,
1713 send_request_vote_response(from,
false);
1716 else if (state->current_view < r.term)
1719 "Recv request vote to {} from {}: their term is later ({} < {})",
1722 state->current_view,
1727 if (leader_id.has_value())
1731 "Recv request vote to {} from {}: leader {} already known in term {}",
1735 state->current_view);
1736 send_request_vote_response(from,
false);
1740 if ((voted_for.has_value()) && (voted_for.value() != from))
1744 "Recv request vote to {} from {}: already voted for {}",
1748 send_request_vote_response(from,
false);
1756 const auto term_of_last_committable_idx =
1757 get_term_internal(last_committable_idx);
1760 (r.term_of_last_committable_idx > term_of_last_committable_idx) ||
1761 ((r.term_of_last_committable_idx == term_of_last_committable_idx) &&
1762 (r.last_committable_idx >= last_committable_idx));
1768 restart_election_timeout();
1775 "Voting against candidate at {}.{} because local state is at {}.{}",
1776 r.term_of_last_committable_idx,
1777 r.last_committable_idx,
1778 term_of_last_committable_idx,
1779 last_committable_idx);
1782 send_request_vote_response(from, answer);
1785 void send_request_vote_response(
const ccf::NodeId& to,
bool answer)
1788 "Send request vote response from {} to {}: {}",
1793 RequestVoteResponse response{
1794 .term = state->current_view, .vote_granted = answer};
1800 void recv_request_vote_response(
1803 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1805#ifdef CCF_RAFT_TRACING
1806 nlohmann::json j = {};
1807 j[
"function"] =
"recv_request_vote_response";
1809 j[
"state"] = *state;
1810 COMMITTABLE_INDICES(j[
"state"], state);
1811 j[
"from_node_id"] = from;
1818 "Recv request vote response to {} from: {}: we aren't a candidate",
1825 auto node = all_other_nodes.find(from);
1826 if (node == all_other_nodes.end())
1829 "Recv request vote response to {} from {}: unknown node",
1835 if (state->current_view < r.term)
1838 "Recv request vote response to {} from {}: their term is more recent "
1842 state->current_view,
1847 else if (state->current_view != r.term)
1851 "Recv request vote response to {} from {}: stale ({} != {})",
1854 state->current_view,
1858 else if (!r.vote_granted)
1862 "Recv request vote response to {} from {}: they voted no",
1869 "Recv request vote response to {} from {}: they voted yes",
1873 add_vote_for_me(from);
1876 void recv_propose_request_vote(
1879 std::lock_guard<ccf::pal::Mutex> guard(state->lock);
1881#ifdef CCF_RAFT_TRACING
1882 nlohmann::json j = {};
1883 j[
"function"] =
"recv_propose_request_vote";
1885 j[
"state"] = *state;
1886 COMMITTABLE_INDICES(j[
"state"], state);
1887 j[
"from_node_id"] = from;
1893 "Becoming candidate early due to propose request vote from {}", from);
1898 RAFT_INFO_FMT(
"Ignoring propose request vote from {}", from);
1902 void restart_election_timeout()
1906 timeout_elapsed = std::chrono::milliseconds(distrib(rand));
1909 void reset_votes_for_me()
1911 votes_for_me.clear();
1912 for (
auto const& conf : configurations)
1914 votes_for_me[conf.idx].quorum = get_quorum(conf.nodes.size());
1915 votes_for_me[conf.idx].votes.clear();
1920 void become_candidate()
1922 if (configurations.empty())
1928 "Not becoming candidate {} due to lack of a configuration.",
1936 voted_for = state->node_id;
1937 reset_votes_for_me();
1938 state->current_view++;
1940 restart_election_timeout();
1944 "Becoming candidate {}: {}", state->node_id, state->current_view);
1946#ifdef CCF_RAFT_TRACING
1947 nlohmann::json j = {};
1948 j[
"function"] =
"become_candidate";
1949 j[
"state"] = *state;
1950 COMMITTABLE_INDICES(j[
"state"], state);
1951 j[
"configurations"] = configurations;
1955 add_vote_for_me(state->node_id);
1962 send_request_vote(node_id);
1966 void become_leader(
bool force_become_leader =
false)
1976 "Election index is {} in term {}", election_index, state->current_view);
1980 if (state->commit_idx > 0)
1987 store->initialise_term(state->current_view);
1991 leader_id = state->node_id;
1994 using namespace std::chrono_literals;
1995 timeout_elapsed = 0ms;
2000 "Becoming leader {}: {}", state->node_id, state->current_view);
2002#ifdef CCF_RAFT_TRACING
2003 nlohmann::json j = {};
2004 j[
"function"] =
"become_leader";
2005 j[
"state"] = *state;
2006 COMMITTABLE_INDICES(j[
"state"], state);
2007 j[
"configurations"] = configurations;
2018 auto next = state->last_idx + 1;
2020 for (
auto& node : all_other_nodes)
2022 node.second.match_idx = 0;
2023 node.second.sent_idx = next - 1;
2026 send_append_entries(node.first, next);
2029 if (retired_node_cleanup)
2031 retired_node_cleanup->cleanup();
2041 restart_election_timeout();
2051 "Becoming follower {}: {}.{}",
2053 state->current_view,
2056#ifdef CCF_RAFT_TRACING
2057 nlohmann::json j = {};
2058 j[
"function"] =
"become_follower";
2059 j[
"state"] = *state;
2060 COMMITTABLE_INDICES(j[
"state"], state);
2061 j[
"configurations"] = configurations;
2073 state->current_view = term;
2075 reset_votes_for_me();
2077 is_new_follower =
true;
2084 "Becoming retired, phase {} (leadership {}): {}: {} at {}",
2086 state->leadership_state,
2088 state->current_view,
2094 !state->retirement_idx.has_value(),
2095 "retirement_idx already set to {}",
2096 state->retirement_idx.value());
2097 state->retirement_idx = idx;
2102 assert(state->retirement_idx.has_value());
2104 idx >= state->retirement_idx.value(),
2105 "Index {} unexpectedly lower than retirement_idx {}",
2107 state->retirement_idx.value());
2108 state->retirement_committable_idx = idx;
2115 ProposeRequestVote prv{.term = state->current_view};
2117 std::optional<ccf::NodeId> successor = std::nullopt;
2118 Index max_match_idx = 0;
2131 for (
auto& [node, node_state] : all_other_nodes)
2133 if (node_state.match_idx >= max_match_idx)
2136 auto conf = configurations.rbegin();
2137 while (conf != configurations.rend())
2139 if (conf->nodes.find(node) != conf->nodes.end())
2141 latest_reconf_id = conf->idx;
2146 if (!(node_state.match_idx == max_match_idx &&
2147 latest_reconf_id < reconf_id_of_max_match))
2149 reconf_id_of_max_match = latest_reconf_id;
2151 max_match_idx = node_state.match_idx;
2155 if (successor.has_value())
2157 RAFT_INFO_FMT(
"Node retired, nudging {}", successor.value());
2168 state->retirement_phase = phase;
2173 if (configurations.empty())
2176 "Not voting for myself {} due to lack of a configuration.",
2182 for (
auto const& conf : configurations)
2184 auto const&
nodes = conf.nodes;
2191 votes_for_me[conf.idx].votes.insert(from);
2193 "Node {} voted for {} in configuration {} with quorum {}",
2197 votes_for_me[conf.idx].quorum);
2201 bool is_elected =
true;
2202 for (
auto const& v : votes_for_me)
2204 auto const& quorum = v.second.quorum;
2205 auto const& votes = v.second.votes;
2207 if (votes.size() < quorum)
2223 void update_commit()
2227 throw std::logic_error(
2228 "update_commit() must only be called while this node is leader");
2231 std::optional<Index> new_agreement_index = std::nullopt;
2234 for (
auto const& c : configurations)
2238 std::vector<Index> match;
2239 match.reserve(c.nodes.size());
2241 for (
auto node : c.
nodes)
2243 if (node.first == state->node_id)
2245 match.push_back(state->last_idx);
2249 match.push_back(all_other_nodes.at(node.first).match_idx);
2253 sort(match.begin(), match.end());
2254 auto confirmed = match.at((match.size() - 1) / 2);
2257 !new_agreement_index.has_value() ||
2258 confirmed < new_agreement_index.value())
2260 new_agreement_index = confirmed;
2264 if (new_agreement_index.has_value())
2266 if (new_agreement_index.value() > state->last_idx)
2268 throw std::logic_error(
2269 "Followers appear to have later match indices than leader");
2272 const auto new_commit_idx =
2275 if (new_commit_idx.has_value())
2278 "In update_commit, new_commit_idx: {}, "
2280 new_commit_idx.value(),
2283 const auto term_of_new = get_term_internal(new_commit_idx.value());
2284 if (term_of_new == state->current_view)
2286 commit(new_commit_idx.value());
2291 "Ack quorum at {} resulted in proposed commit index {}, which "
2292 "is in term {}. Waiting for agreement on committable entry in "
2293 "current term {} to update commit",
2294 new_agreement_index.value(),
2295 new_commit_idx.value(),
2297 state->current_view);
2305 void commit_if_possible(
Index idx)
2308 "Commit if possible {} (ci: {}) (ti {})",
2311 get_term_internal(idx));
2313 (idx > state->commit_idx) &&
2314 (get_term_internal(idx) <= state->current_view))
2316 const auto highest_committable =
2318 if (highest_committable.has_value())
2320 commit(highest_committable.value());
2325 size_t get_quorum(
size_t n)
const
2330 void commit(
Index idx)
2332 if (idx > state->last_idx)
2334 throw std::logic_error(fmt::format(
2335 "Tried to commit {} but last_idx is {}", idx, state->last_idx));
2342 if (idx <= state->commit_idx)
2345#ifdef CCF_RAFT_TRACING
2346 nlohmann::json j = {};
2347 j[
"function"] =
"commit";
2348 j[
"args"] = nlohmann::json::object();
2349 j[
"args"][
"idx"] = idx;
2350 j[
"state"] = *state;
2351 COMMITTABLE_INDICES(j[
"state"], state);
2352 j[
"configurations"] = configurations;
2358 state->commit_idx = idx;
2362 state->retirement_committable_idx.has_value() &&
2363 idx >= state->retirement_committable_idx.value())
2369 store->compact(idx);
2376 bool changed =
false;
2380 auto conf = configurations.begin();
2381 if (conf == configurations.end())
2386 auto next = std::next(conf);
2387 if (next == configurations.end())
2392 if (idx < next->idx)
2398 "Configurations: discard committed configuration at {}", conf->idx);
2399 configurations.pop_front();
2405 create_and_remove_node_state();
2408 retired_node_cleanup->cleanup();
2413 bool is_self_in_latest_config()
2415 bool present =
false;
2416 if (!configurations.empty())
2418 auto current_nodes = configurations.back().nodes;
2419 present = current_nodes.find(state->node_id) != current_nodes.end();
2424 void start_ticking_if_necessary()
2426 if (!ticking && is_self_in_latest_config())
2435 if (idx < state->commit_idx)
2438 "Asked to rollback to idx:{} but committed to commit_idx:{} - "
2439 "ignoring rollback request",
2445 store->rollback({get_term_internal(idx), idx}, state->current_view);
2447 RAFT_DEBUG_FMT(
"Setting term in store to: {}", state->current_view);
2449 state->last_idx = idx;
2452 state->view_history.rollback(idx);
2454 while (!state->committable_indices.empty() &&
2455 (state->committable_indices.back() > idx))
2457 state->committable_indices.pop_back();
2464 assert(state->retirement_committable_idx.has_value());
2465 if (state->retirement_committable_idx.value() > idx)
2467 state->retirement_committable_idx = std::nullopt;
2476 assert(state->retirement_idx.has_value());
2477 if (state->retirement_idx.value() > idx)
2479 state->retirement_idx = std::nullopt;
2480 state->retirement_phase = std::nullopt;
2487 bool changed =
false;
2489 while (!configurations.empty() && (configurations.back().idx > idx))
2492 "Configurations: rollback configuration at {}",
2493 configurations.back().idx);
2494 configurations.pop_back();
2500 create_and_remove_node_state();
2510 void create_and_remove_node_state()
2515 for (
auto const& conf : configurations)
2517 for (
auto const& node : conf.nodes)
2519 active_nodes.emplace(node.first, node.second);
2525 std::vector<ccf::NodeId> to_remove;
2527 for (
const auto& node : all_other_nodes)
2529 if (active_nodes.find(node.first) == active_nodes.end())
2531 to_remove.push_back(node.first);
2536 for (
auto node_info : active_nodes)
2538 if (node_info.first == state->node_id)
2543 if (all_other_nodes.find(node_info.first) == all_other_nodes.end())
2545 if (!
channels->have_channel(node_info.first))
2548 "Configurations: create node channel with {}", node_info.first);
2552 node_info.second.hostname,
2553 node_info.second.port);
2558 auto index = state->last_idx + 1;
2559 all_other_nodes.try_emplace(
2560 node_info.first, node_info.second, index, 0);
2564 send_append_entries(node_info.first, index);
2568 "Added raft node {} ({}:{})",
2570 node_info.second.hostname,
2571 node_info.second.port);