84 std::vector<ccf::crypto::SubjectAltName> subject_alt_names = {};
86 std::shared_ptr<ccf::crypto::KeyPair_OpenSSL> node_sign_kp;
88 std::shared_ptr<ccf::crypto::RSAKeyPair> node_encrypt_kp;
90 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
94 std::optional<UVMEndorsements> snp_uvm_endorsements = std::nullopt;
95 std::vector<uint8_t> startup_snapshot;
96 std::shared_ptr<QuoteEndorsementsClient> quote_endorsements_client =
99 std::atomic<bool> stop_noticed =
false;
105 View create_view_ = 0,
106 bool create_consortium_ =
true) :
108 create_view(create_view_),
109 create_consortium(create_consortium_)
113 bool create_consortium;
122 size_t sig_tx_interval;
123 size_t sig_ms_interval;
127 std::shared_ptr<ccf::kv::Consensus>
consensus;
128 std::shared_ptr<RPCMap> rpc_map;
129 std::shared_ptr<indexing::Indexer> indexer;
130 std::shared_ptr<NodeToNode> n2n_channels;
131 std::shared_ptr<Forwarder<NodeToNode>> cmd_forwarder;
132 std::shared_ptr<RPCSessions> rpcsessions;
134 std::shared_ptr<ccf::kv::TxHistory> history;
135 std::shared_ptr<ccf::kv::AbstractTxEncryptor> encryptor;
138 std::shared_ptr<Snapshotter> snapshotter;
143 std::shared_ptr<ccf::kv::Store> recovery_store;
147 std::vector<ccf::kv::Version> view_history;
151 static const size_t recovery_batch_size = 100;
156 std::shared_ptr<JwtKeyAutoRefresh> jwt_key_auto_refresh;
158 std::unique_ptr<StartupSnapshotInfo> startup_snapshot_info =
nullptr;
164 std::map<NodeInfoNetwork::RpcInterfaceID, std::shared_ptr<ACMEClient>>
168 std::shared_ptr<ACMEChallengeHandler>>
169 acme_challenge_handlers;
170 size_t num_acme_interfaces = 0;
172 std::shared_ptr<ccf::kv::AbstractTxEncryptor> make_encryptor()
174#ifdef USE_NULL_ENCRYPTOR
175 return std::make_shared<ccf::kv::NullTxEncryptor>();
182 void initialise_startup_snapshot(
bool recovery =
false)
184 std::shared_ptr<ccf::kv::Store> snapshot_store;
189 auto snapshot_history = std::make_shared<MerkleTxHistory>(
190 *snapshot_store.get(),
197 auto snapshot_encryptor = make_encryptor();
199 snapshot_store->set_history(snapshot_history);
200 snapshot_store->set_encryptor(snapshot_encryptor);
204 snapshot_store = network.
tables;
208 startup_snapshot_info = initialise_from_snapshot(
210 std::move(startup_snapshot),
216 startup_seqno = startup_snapshot_info->seqno;
217 last_recovered_idx = startup_seqno;
218 last_recovered_signed_idx = last_recovered_idx;
225 std::shared_ptr<RPCSessions> rpcsessions,
231 node_encrypt_kp(
ccf::crypto::make_rsa_key_pair()),
232 writer_factory(writer_factory),
233 to_host(writer_factory.create_writer_to_outside()),
235 rpcsessions(rpcsessions),
236 share_manager(network.ledger_secrets)
242 const std::vector<uint8_t>& expected_node_public_key_der,
246 tx, quote_info_, expected_node_public_key_der, measurement);
254 std::shared_ptr<RPCMap> rpc_map_,
255 std::shared_ptr<AbstractRPCResponder> rpc_sessions_,
256 std::shared_ptr<indexing::Indexer> indexer_,
257 size_t sig_tx_interval_,
258 size_t sig_ms_interval_)
260 std::lock_guard<pal::Mutex> guard(lock);
263 consensus_config = consensus_config_;
266 sig_tx_interval = sig_tx_interval_;
267 sig_ms_interval = sig_ms_interval_;
269 n2n_channels = std::make_shared<NodeToNodeChannelManager>(writer_factory);
271 cmd_forwarder = std::make_shared<Forwarder<NodeToNode>>(
272 rpc_sessions_, n2n_channels, rpc_map);
276 for (
auto& [actor, fe] : rpc_map->frontends())
278 fe->set_sig_intervals(sig_tx_interval, sig_ms_interval);
279 fe->set_cmd_forwarder(cmd_forwarder);
289 if (measurement.has_value())
291 node_measurement = measurement.value();
295 throw std::logic_error(
"Failed to extract code id from quote");
304 "Security policy not set, skipping check against attestation host "
310 if (!quoted_digest.has_value())
312 throw std::logic_error(
"Unable to find host data in attestation");
315 auto const& security_policy =
318 auto security_policy_digest =
320 if (security_policy_digest != quoted_digest.value())
322 throw std::logic_error(fmt::format(
323 "Digest of decoded security policy \"{}\" {} does not match "
324 "attestation host data {}",
326 security_policy_digest.hex_str(),
327 quoted_digest.value().hex_str()));
330 "Successfully verified attested security policy {}",
331 security_policy_digest);
337 "UVM endorsements not set, skipping check against attestation "
346 snp_uvm_endorsements =
347 verify_uvm_endorsements(uvm_endorsements_raw, node_measurement);
350 catch (
const std::exception& e)
352 throw std::logic_error(
353 fmt::format(
"Error verifying UVM endorsements: {}", e.what()));
363 create_and_send_boot_request(
364 aft::starting_view_change,
true );
369 if (!startup_snapshot.empty())
371 initialise_startup_snapshot();
380 setup_recovery_hook();
381 if (!startup_snapshot.empty())
383 initialise_startup_snapshot(
true);
384 snapshotter->set_last_snapshot_idx(last_recovered_idx);
393 throw std::logic_error(
394 fmt::format(
"Node was launched in unknown mode {}", start_type));
401 auto fetch_endorsements = [
this](
404 EndorsementEndpointsConfiguration&
413 throw std::runtime_error(
414 "One or more SNP endorsements servers must be specified to fetch "
415 "the collateral for the attestation");
418 quote_endorsements_client = std::make_shared<QuoteEndorsementsClient>(
421 [
this, qi](std::vector<uint8_t>&& endorsements) {
422 std::lock_guard<pal::Mutex> guard(lock);
429 catch (
const std::exception& e)
434 quote_endorsements_client.reset();
437 quote_endorsements_client->fetch_endorsements();
445 throw std::runtime_error(
446 "SGX quote generation should have already fetched endorsements");
465 std::vector<uint8_t>&& startup_snapshot_)
467 std::lock_guard<pal::Mutex> guard(lock);
469 start_type = start_type_;
471 config = std::move(config_);
472 startup_snapshot = std::move(startup_snapshot_);
473 subject_alt_names = get_subject_alternative_names();
476 self_signed_node_cert = create_self_signed_cert(
483 accept_node_tls_connections();
492 setup_acme_clients();
500 network.
identity = std::make_unique<ReplicatedNetworkIdentity>(
518 return {self_signed_node_cert, network.
identity->cert};
523 return {self_signed_node_cert, {}};
529 throw std::logic_error(
530 "Recovery requires the certificate of the previous service "
537 network.
identity = std::make_unique<ReplicatedNetworkIdentity>(
544 return {self_signed_node_cert, network.
identity->cert};
548 throw std::logic_error(
549 fmt::format(
"Node was started in unknown mode {}", start_type));
562 auto network_ca = std::make_shared<::tls::CA>(std::string(
565 auto join_client_cert = std::make_unique<::tls::Cert>(
567 self_signed_node_cert,
568 node_sign_kp->private_key_pem(),
574 auto join_client = rpcsessions->create_client(
575 std::move(join_client_cert),
576 rpcsessions->get_app_protocol_main_interface());
578 auto [target_host, target_port] =
581 join_client->connect(
587 std::vector<uint8_t>&& data) {
588 std::lock_guard<pal::Mutex> guard(lock);
594 if (status != HTTP_STATUS_OK)
596 const auto& location = headers.find(http::headers::LOCATION);
599 (status == HTTP_STATUS_PERMANENT_REDIRECT ||
600 status == HTTP_STATUS_TEMPORARY_REDIRECT) &&
601 location != headers.end())
605 make_net_address(url.host, url.port);
606 LOG_INFO_FMT(
"Target node redirected to {}", location->second);
611 "An error occurred while joining the network: {} {}{}",
613 http_status_str(status),
616 fmt::format(
" '{}'", std::string(data.begin(), data.end())));
621 auto j = nlohmann::json::parse(data);
628 catch (
const std::exception& e)
631 "An error occurred while parsing the join network response");
633 "An error occurred while parsing the join network response: {}",
641 network.
identity = std::make_unique<ReplicatedNetworkIdentity>(
647 if (!resp.
network_info->endorsed_certificate.has_value())
650 throw std::logic_error(
651 "Expected endorsed certificate in join response");
653 n2n_channels_cert = resp.
network_info->endorsed_certificate.value();
665 last_recovered_signed_idx =
667 setup_recovery_hook();
668 snapshotter->set_snapshot_generation(
false);
672 std::vector<ccf::kv::Version> view_history_ = {};
673 if (startup_snapshot_info)
678 deserialise_snapshot(
680 startup_snapshot_info->raw,
686 for (
auto& hook : hooks)
691 auto tx = network.
tables->create_read_only_tx();
693 auto sig = signatures->get();
694 if (!sig.has_value())
696 throw std::logic_error(
697 fmt::format(
"No signatures found after applying snapshot"));
706 startup_snapshot_info.reset();
710 "Joiner successfully resumed from snapshot at seqno {} and "
712 network.
tables->current_version(),
717 network.
tables->current_version(),
720 last_recovered_signed_idx);
722 snapshotter->set_last_snapshot_idx(
723 network.
tables->current_version());
724 history->start_signature_emit_timer();
738 "Node has now joined the network as node {}: {}",
740 (resp.
network_info->public_only ?
"public only" :
"all domains"));
745 "Node {} is waiting for votes of members to be trusted", self);
748 [
this](
const std::string& error_msg) {
749 std::lock_guard<pal::Mutex> guard(lock);
750 auto long_error_msg = fmt::format(
751 "Early error when joining existing network at {}: {}. Shutting "
752 "down node gracefully...",
757 AdminMessage::fatal_error_msg, to_host, long_error_msg);
775 const auto body = nlohmann::json(join_params).dump();
782 http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
785 join_client->send_request(std::move(r));
790 std::lock_guard<pal::Mutex> guard(lock);
798 auto timer_msg = std::make_unique<::threading::Tmsg<NodeStateMsg>>(
799 [](std::unique_ptr<::threading::Tmsg<NodeStateMsg>> msg) {
800 std::lock_guard<pal::Mutex> guard(msg->data.self.lock);
803 msg->data.self.initiate_join_unsafe();
804 auto delay = std::chrono::milliseconds(
805 msg->data.self.config.join.retry_timeout);
808 std::move(msg), delay);
822 "JWT key auto-refresh: consensus not initialized, not starting "
826 jwt_key_auto_refresh = std::make_shared<JwtKeyAutoRefresh>(
833 self_signed_node_cert);
834 jwt_key_auto_refresh->start();
836 network.
tables->set_map_hook(
840 jwt_key_auto_refresh->schedule_once();
841 return ccf::kv::ConsensusHookPtr(nullptr);
847 return jwt_key_auto_refresh->get_attempts();
857 throw std::logic_error(fmt::format(
858 "Node should be in state {} to start reading ledger",
865 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
870 std::lock_guard<pal::Mutex> guard(lock);
874 auto data = entries.data();
875 auto size = entries.size();
887 LOG_INFO_FMT(
"Deserialising public ledger entry [{}]", entry.size());
894 auto r = network.
tables->deserialize(entry,
true);
899 "Failed to deserialise public ledger entry: {}", result);
903 ++last_recovered_idx;
906 for (
auto& hook : r->get_hooks())
911 catch (
const std::exception& e)
914 "Failed to deserialise public ledger entry: {}", e.what());
923 network.
tables->compact(last_recovered_idx);
924 auto tx = network.
tables->create_read_only_tx();
925 auto last_sig = tx.ro(network.
signatures)->get();
927 if (!last_sig.has_value())
929 throw std::logic_error(
"Signature missing");
933 "Read signature at {} for view {}",
943 const auto view_start_idx =
944 view_history.empty() ? 1 : last_recovered_signed_idx + 1;
947 "last_sig->view is invalid, {}",
949 for (
auto i = view_history.size();
950 i < static_cast<size_t>(last_sig->view);
953 view_history.push_back(view_start_idx);
955 last_recovered_signed_idx = last_recovered_idx;
960 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
965 std::lock_guard<pal::Mutex> guard(lock);
967 history->start_signature_emit_timer();
973 std::lock_guard<pal::Mutex> guard(lock);
975 history->start_signature_emit_timer();
988 const auto last_recovered_term = view_history.size();
989 auto new_term = last_recovered_term + aft::starting_view_change;
990 LOG_INFO_FMT(
"Setting term on public recovery store to {}", new_term);
994 {last_recovered_term, last_recovered_signed_idx}, new_term);
995 ledger_truncate(last_recovered_signed_idx,
true);
996 snapshotter->rollback(last_recovered_signed_idx);
999 "End of public ledger recovery - Truncating ledger to last signed "
1001 last_recovered_term,
1002 last_recovered_signed_idx);
1004 auto tx = network.
tables->create_read_only_tx();
1008 snapshotter->init_after_public_recovery();
1009 snapshotter->set_snapshot_generation(
false);
1017 auto s = ls.value();
1025 h->set_node_id(self);
1028 auto service_config = tx.ro(network.
config)->get();
1036 consensus->force_become_primary(index,
view, view_history, index);
1038 create_and_send_boot_request(
1047 std::lock_guard<pal::Mutex> guard(lock);
1051 "Node in state {} cannot recover private ledger entries", sm.
value());
1055 auto data = entries.data();
1056 auto size = entries.size();
1069 "Deserialising private ledger entry {} [{}]",
1070 last_recovered_idx + 1,
1077 result = recovery_store->deserialize(entry)->apply();
1081 "Failed to deserialise private ledger entry: {}", result);
1084 recovery_store->rollback({0, last_recovered_idx}, 0);
1088 ++last_recovered_idx;
1090 catch (
const std::exception& e)
1093 "Failed to deserialise private ledger entry: {}", e.what());
1100 recovery_store->compact(last_recovered_idx);
1104 if (recovery_store->current_version() == recovery_v)
1106 LOG_INFO_FMT(
"Reached recovery final version at {}", recovery_v);
1111 read_ledger_entries(
1112 last_recovered_idx + 1,
1113 std::min(last_recovered_idx + recovery_batch_size, recovery_v));
1124 if (recovery_v != recovery_store->current_version())
1126 throw std::logic_error(fmt::format(
1127 "Private recovery did not reach public ledger seqno: {}/{}",
1128 recovery_store->current_version(),
1134 if (h->get_replicated_state_root() != recovery_root)
1136 throw std::logic_error(fmt::format(
1137 "Root of public store does not match root of private store at {}",
1141 network.
tables->swap_private_maps(*recovery_store.get());
1142 recovery_store.reset();
1148 snapshotter->set_snapshot_generation(
true);
1153 auto tx = network.
tables->create_tx();
1159 auto active_service = service->get();
1161 if (!active_service.has_value())
1163 throw std::logic_error(fmt::format(
1164 "Error in {}: no value in {}", __func__, Tables::SERVICE));
1168 active_service->status !=
1171 throw std::logic_error(fmt::format(
1172 "Error in {}: current service status is {}",
1174 active_service->status));
1188 throw std::logic_error(
"Service could not be opened");
1198 throw std::logic_error(
1199 "Could not commit transaction when finishing network recovery");
1202 recovered_encrypted_ledger_secrets.clear();
1214 network.
tables->set_map_hook(
1219 const EncryptedLedgerSecretsInfo::Write& w)
1223 throw std::logic_error(fmt::format(
1224 "Unexpected removal from {} table",
1225 network.encrypted_ledger_secrets.get_name()));
1231 network.
tables->unset_map_hook(
1243 std::lock_guard<pal::Mutex> guard(lock);
1245 if (is_reading_public_ledger())
1247 recover_public_ledger_end_unsafe();
1249 else if (is_reading_private_ledger())
1251 recover_private_ledger_end_unsafe();
1256 "Node in state {} cannot finalise ledger recovery", sm.value());
1266 recovery_store = std::make_shared<ccf::kv::Store>(
1269 auto recovery_history = std::make_shared<MerkleTxHistory>(
1270 *recovery_store.get(),
1277 auto recovery_encryptor = make_encryptor();
1279 recovery_store->set_history(recovery_history);
1280 recovery_store->set_encryptor(recovery_encryptor);
1283 recovery_v = network.tables->current_version();
1285 recovery_root = h->get_replicated_state_root();
1287 if (startup_snapshot_info)
1289 std::vector<ccf::kv::Version> view_history_;
1291 deserialise_snapshot(
1293 startup_snapshot_info->raw,
1297 config.recover.previous_service_identity);
1298 startup_snapshot_info.reset();
1302 "Recovery store successfully setup at {}. Target recovery seqno: {}",
1303 recovery_store->current_version(),
1309 share_manager.shuffle_recovery_shares(tx);
1317 throw std::logic_error(
"Could not cast tx to CommittableTx");
1326 if (committable_tx ==
nullptr)
1328 throw std::logic_error(
"Could not cast tx to CommittableTx");
1330 committable_tx->set_flag(
1336 const std::optional<std::vector<std::string>>& interfaces =
1337 std::nullopt)
override
1339 if (!network.identity)
1344 num_acme_interfaces = 0;
1346 for (
const auto& [iname, interface] : config.network.rpc_interfaces)
1349 !interface.endorsement ||
1351 !interface.endorsement->acme_configuration)
1356 num_acme_interfaces++;
1360 std::find(interfaces->begin(), interfaces->end(), iname) !=
1363 auto challenge_frontend = find_acme_challenge_frontend();
1365 const std::string& cfg_name =
1366 *interface.endorsement->acme_configuration;
1367 auto cit = config.network.acme->configurations.find(cfg_name);
1368 if (cit == config.network.acme->configurations.end())
1370 LOG_INFO_FMT(
"Unknown ACME configuration '{}'", cfg_name);
1375 !cit->second.directory_url.empty() &&
1376 acme_clients.find(cfg_name) == acme_clients.end())
1378 const auto& cfg = cit->second;
1380 auto client = std::make_shared<ACMEClient>(
1389 auto chit = acme_challenge_handlers.find(iname);
1390 if (chit != acme_challenge_handlers.end())
1392 client->install_custom_challenge_handler(chit->second);
1395 acme_clients.emplace(cfg_name,
client);
1398 auto client = acme_clients[cfg_name];
1402 make_key_pair(network.identity->priv_key),
true);
1409 const std::vector<std::string>& args,
1410 const std::vector<uint8_t>& input)
override
1413 nlohmann::json j = msg;
1414 auto json = j.dump();
1416 "Triggering host process launch: {} size={}",
json, input.size());
1418 AppMessage::launch_host_process, to_host,
json, input);
1425 std::lock_guard<pal::Mutex> guard(lock);
1427 auto service = tx.
rw<
Service>(Tables::SERVICE);
1428 auto service_info = service->get();
1429 if (!service_info.has_value())
1431 throw std::logic_error(
1432 "Service information cannot be found to transition service to "
1443 "Service in state {} is already open", service_info->status);
1449 const auto prev_ident =
1452 if (!prev_ident.has_value() || !identities.
previous.has_value())
1454 throw std::logic_error(
1455 "Recovery with service certificates requires both, a previous "
1456 "service identity written to the KV during recovery genesis and a "
1457 "transition_service_to_open proposal that contains previous and "
1458 "next service certificates");
1463 if (prev_ident.value() != from_proposal)
1465 throw std::logic_error(fmt::format(
1466 "Previous service identity does not match.\nActual:\n{}\nIn "
1469 from_proposal.
str()));
1473 if (identities.
next != service_info->cert)
1475 throw std::logic_error(fmt::format(
1476 "Service identity mismatch: the next service identity in the "
1477 "transition_service_to_open proposal does not match the current "
1478 "service identity:\nNext:\n{}\nCurrent:\n{}",
1480 service_info->cert.str()));
1483 service_info->previous_service_identity_version =
1484 service->get_version_of_previous_write();
1486 if (is_part_of_public_network())
1492 service->put(service_info.value());
1495 else if (is_part_of_network())
1502 share_manager.issue_recovery_shares(tx);
1504 catch (
const std::logic_error& e)
1506 throw std::logic_error(
1507 fmt::format(
"Failed to issue recovery shares: {}", e.what()));
1511 trigger_snapshot(tx);
1516 throw std::logic_error(
1517 fmt::format(
"Node in state {} cannot open service", sm.value()));
1526 std::lock_guard<pal::Mutex> guard(lock);
1530 share_manager.restore_recovery_shares_info(
1531 tx, recovered_encrypted_ledger_secrets);
1537 tx.
wo(network.secrets),
1538 std::move(recovered_ledger_secrets));
1544 void tick(std::chrono::milliseconds elapsed)
1558 const auto tx_id =
consensus->get_committed_txid();
1559 indexer->update_strategies(elapsed, {tx_id.first, tx_id.second});
1562 n2n_channels->tick(elapsed);
1580 stop_noticed =
true;
1585 return stop_noticed;
1590 auto [msg_type, from, payload] =
1591 ringbuffer::read_message<node_inbound>(data, size);
1593 auto payload_data = payload.data;
1594 auto payload_size = payload.size;
1598 cmd_forwarder->recv_message(from, payload_data, payload_size);
1609 "Ignoring node msg received too early - current state is {}",
1618 n2n_channels->recv_channel_message(
1619 from, payload_data, payload_size);
1625 consensus->recv_message(from, payload_data, payload_size);
1631 LOG_FAIL_FMT(
"Unknown node message type: {}", msg_type);
1686 const auto val = sm.value();
1694 std::lock_guard<pal::Mutex> guard(lock);
1695 auto s = sm.value();
1698 return {s, recovery_v, recovery_store->current_version()};
1702 return {s, std::nullopt, std::nullopt};
1708 std::lock_guard<pal::Mutex> guard(lock);
1720 !service_status.has_value() ||
1723 LOG_FAIL_FMT(
"Cannot rekey ledger while the service is not open");
1731 share_manager.issue_recovery_shares(tx, new_ledger_secret);
1734 tx.
wo(network.secrets),
1735 std::move(new_ledger_secret));
1747 std::lock_guard<pal::Mutex> guard(lock);
1748 return startup_seqno;
1753 return rpcsessions->get_session_metrics();
1758 std::lock_guard<pal::Mutex> guard(lock);
1759 return self_signed_node_cert;
1763 bool is_ip(
const std::string_view& hostname)
1771 const auto final_component =
1772 ccf::nonstd::split(ccf::nonstd::split(hostname,
".").back(),
":")
1774 if (final_component.empty())
1776 throw std::runtime_error(fmt::format(
1777 "{} has a trailing period, is not a valid hostname", hostname));
1779 for (
const auto c : final_component)
1781 if (c <
'0' || c >
'9')
1790 std::vector<ccf::crypto::SubjectAltName> get_subject_alternative_names()
1795 if (!config.node_certificate.subject_alt_names.empty())
1797 return ccf::crypto::sans_from_string_list(
1798 config.node_certificate.subject_alt_names);
1804 std::vector<ccf::crypto::SubjectAltName> sans;
1805 for (
const auto& [_, interface] : config.network.rpc_interfaces)
1807 auto host = split_net_address(interface.published_address).first;
1808 sans.push_back({
host, is_ip(
host)});
1814 void accept_node_tls_connections()
1818 rpcsessions->set_node_cert(
1819 self_signed_node_cert, node_sign_kp->private_key_pem());
1823 void accept_network_tls_connections()
1828 endorsed_node_cert.has_value(),
1829 "Node certificate should be endorsed before accepting endorsed "
1832 rpcsessions->set_network_cert(
1833 endorsed_node_cert.value(), node_sign_kp->private_key_pem());
1839 auto fe = rpc_map->find(actor);
1840 if (!fe.has_value())
1842 throw std::logic_error(
1843 fmt::format(
"Cannot find {} frontend", (
int)actor));
1850 find_frontend(actor)->open();
1853 void open_user_frontend()
1858 bool is_member_frontend_open_unsafe()
1863 bool is_member_frontend_open()
override
1865 std::lock_guard<pal::Mutex> guard(lock);
1866 return is_member_frontend_open_unsafe();
1869 bool is_user_frontend_open()
override
1871 std::lock_guard<pal::Mutex> guard(lock);
1875 std::shared_ptr<ACMERpcFrontend> find_acme_challenge_frontend()
1878 if (!acme_challenge_opt)
1880 throw std::runtime_error(
"Missing ACME challenge frontend");
1882 return std::static_pointer_cast<ACMERpcFrontend>(*acme_challenge_opt);
1885 void open_acme_challenge_frontend()
1887 if (config.network.acme && !config.network.acme->configurations.empty())
1897 std::vector<uint8_t> serialize_create_request(
1898 View create_view,
bool create_consortium =
true)
1900 CreateNetworkNodeToNode::In create_params;
1904 if (create_consortium)
1906 create_params.genesis_info = config.start;
1908 create_params.recovery_constitution = config.recover.constitution;
1909 LOG_INFO_FMT(
"serialise_create_request, set recovery_constitution to:");
1910 if (create_params.recovery_constitution.has_value())
1912 LOG_INFO_FMT(
"{}", create_params.recovery_constitution.value());
1919 create_params.node_id = self;
1920 create_params.certificate_signing_request = node_sign_kp->create_csr(
1921 config.node_certificate.subject_name, subject_alt_names);
1922 create_params.node_endorsed_certificate =
1923 ccf::crypto::create_endorsed_cert(
1924 create_params.certificate_signing_request,
1925 config.startup_host_time,
1926 config.node_certificate.initial_validity_days,
1927 network.identity->priv_key,
1928 network.identity->cert);
1932 history->set_endorsed_certificate(
1933 create_params.node_endorsed_certificate);
1935 create_params.public_key = node_sign_kp->public_key_pem();
1936 create_params.service_cert = network.identity->cert;
1937 create_params.quote_info = quote_info;
1938 create_params.public_encryption_key = node_encrypt_kp->public_key_pem();
1939 create_params.measurement = node_measurement;
1940 create_params.snp_uvm_endorsements = snp_uvm_endorsements;
1941 create_params.snp_security_policy =
1942 config.attestation.environment.security_policy;
1944 create_params.node_info_network = config.network;
1945 create_params.node_data = config.node_data;
1946 create_params.service_data = config.service_data;
1947 create_params.create_txid = {create_view, last_recovered_signed_idx + 1};
1949 const auto body = nlohmann::json(create_params).dump();
1954 ccf::http::headers::CONTENT_TYPE,
1955 ccf::http::headervalues::contenttype::JSON);
1957 request.set_body(body);
1959 return request.build_request();
1962 bool extract_create_result(
const std::shared_ptr<RpcContext>& ctx)
1970 const auto status = ctx->get_response_status();
1971 if (status != HTTP_STATUS_OK)
1974 "Create response is error: {} {}",
1980 const auto body = nlohmann::json::parse(ctx->get_response_body());
1981 if (!body.is_boolean())
1983 LOG_FAIL_FMT(
"Expected boolean body in create response");
1985 "Expected boolean body in create response: {}", body.dump());
1992 bool send_create_request(
const std::vector<uint8_t>& packed)
1994 auto node_session = std::make_shared<SessionContext>(
1995 InvalidSessionId, self_signed_node_cert.raw());
1998 std::shared_ptr<ccf::RpcHandler> search =
1999 ::http::fetch_rpc_handler(ctx, this->rpc_map);
2001 search->process(ctx);
2003 return extract_create_result(ctx);
2006 void create_and_send_boot_request(
2007 View create_view,
bool create_consortium =
true)
2011 auto msg = std::make_unique<::threading::Tmsg<NodeStateMsg>>(
2012 [](std::unique_ptr<::threading::Tmsg<NodeStateMsg>> msg) {
2013 if (!msg->data.self.send_create_request(
2014 msg->data.self.serialize_create_request(
2015 msg->data.create_view, msg->data.create_consortium)))
2017 throw std::runtime_error(
2018 "Service creation request could not be committed");
2020 if (msg->data.create_consortium)
2022 msg->data.self.advance_part_of_network();
2026 msg->data.self.advance_part_of_public_network();
2037 void begin_private_recovery()
2043 setup_private_recovery_store();
2045 reset_recovery_hook();
2046 setup_one_off_secret_hook();
2049 last_recovered_idx = recovery_store->current_version();
2050 read_ledger_entries(
2051 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
2056 void setup_basic_hooks()
2058 network.tables->set_map_hook(
2059 network.secrets.get_name(),
2060 network.secrets.wrap_map_hook(
2064 if (!is_part_of_network())
2067 return ccf::kv::ConsensusHookPtr(nullptr);
2070 const auto& ledger_secrets_for_nodes = w;
2071 if (!ledger_secrets_for_nodes.has_value())
2073 throw std::logic_error(fmt::format(
2074 "Unexpected removal from {} table",
2075 network.secrets.get_name()));
2078 for (
const auto& [node_id, encrypted_ledger_secrets] :
2079 ledger_secrets_for_nodes.value())
2081 if (node_id != self)
2087 for (
const auto& encrypted_ledger_secret :
2088 encrypted_ledger_secrets)
2091 node_encrypt_kp, encrypted_ledger_secret.encrypted_secret);
2096 network.ledger_secrets->set_secret(
2098 std::make_shared<LedgerSecret>(
2099 std::move(plain_ledger_secret), hook_version));
2106 network.tables->set_global_hook(
2107 network.secrets.get_name(),
2108 network.secrets.wrap_commit_hook([
this](
2110 const Secrets::Write& w) {
2112 if (!is_part_of_public_network())
2117 const auto& ledger_secrets_for_nodes = w;
2118 if (!ledger_secrets_for_nodes.has_value())
2120 throw std::logic_error(fmt::format(
2121 "Unexpected removal from {} table", network.secrets.get_name()));
2124 for (
const auto& [node_id, encrypted_ledger_secrets] :
2125 ledger_secrets_for_nodes.value())
2127 if (node_id != self)
2134 for (
const auto& encrypted_ledger_secret : encrypted_ledger_secrets)
2139 if (!encrypted_ledger_secret.version.has_value())
2141 throw std::logic_error(fmt::format(
2142 "Commit hook at seqno {} for table {}: no version for "
2143 "encrypted ledger secret",
2145 network.secrets.get_name()));
2149 node_encrypt_kp, encrypted_ledger_secret.encrypted_secret);
2151 restored_ledger_secrets.emplace(
2152 encrypted_ledger_secret.version.value(),
2153 std::make_shared<LedgerSecret>(
2154 std::move(plain_ledger_secret),
2155 encrypted_ledger_secret.previous_secret_stored_version));
2158 if (!restored_ledger_secrets.empty())
2162 network.ledger_secrets->restore_historical(
2163 std::move(restored_ledger_secrets));
2164 begin_private_recovery();
2170 "Found no ledger secrets for this node ({}) in global commit hook "
2173 network.secrets.get_name(),
2177 network.tables->set_global_hook(
2178 network.nodes.get_name(),
2179 network.nodes.wrap_commit_hook(
2181 std::vector<NodeId> retired_committed_nodes;
2182 for (
const auto& [node_id, node_info] : w)
2184 if (node_info.has_value() && node_info->retired_committed)
2186 retired_committed_nodes.push_back(node_id);
2190 hook_version, retired_committed_nodes);
2199 network.tables->set_map_hook(
2200 network.node_endorsed_certificates.get_name(),
2201 network.node_endorsed_certificates.wrap_map_hook(
2204 const NodeEndorsedCertificates::Write& w)
2207 "[local] node_endorsed_certificates local hook at version {}, "
2211 for (
auto const& [node_id, endorsed_certificate] : w)
2213 if (node_id != self)
2216 "[local] Ignoring endorsed certificate for other node {}",
2221 if (!endorsed_certificate.has_value())
2224 "[local] Endorsed cert for self ({}) has been deleted", self);
2225 throw std::logic_error(fmt::format(
2226 "Could not find endorsed node certificate for {}", self));
2229 std::lock_guard<pal::Mutex> guard(lock);
2231 if (endorsed_node_cert.has_value())
2234 "[local] Previous endorsed node cert was:\n{}",
2235 endorsed_node_cert->str());
2238 endorsed_node_cert = endorsed_certificate.value();
2240 "[local] Under lock, setting endorsed node cert to:\n{}",
2241 endorsed_node_cert->str());
2242 history->set_endorsed_certificate(endorsed_node_cert.value());
2243 n2n_channels->set_endorsed_node_cert(endorsed_node_cert.value());
2249 network.tables->set_global_hook(
2250 network.node_endorsed_certificates.get_name(),
2251 network.node_endorsed_certificates.wrap_commit_hook(
2254 const NodeEndorsedCertificates::Write& w) {
2256 "[global] node_endorsed_certificates global hook at version {}, "
2260 for (
auto const& [node_id, endorsed_certificate] : w)
2262 if (node_id != self)
2265 "[global] Ignoring endorsed certificate for other node {}",
2270 if (!endorsed_certificate.has_value())
2273 "[global] Endorsed cert for self ({}) has been deleted",
2275 throw std::logic_error(fmt::format(
2276 "Could not find endorsed node certificate for {}", self));
2279 std::lock_guard<pal::Mutex> guard(lock);
2281 LOG_INFO_FMT(
"[global] Accepting network connections");
2282 accept_network_tls_connections();
2284 if (is_member_frontend_open_unsafe())
2292 auto [valid_from, valid_to] =
2294 ->validity_period();
2296 "[global] Member frontend is open, so refreshing self-signed "
2299 "[global] Previously:\n{}", self_signed_node_cert.str());
2300 self_signed_node_cert = create_self_signed_cert(
2302 config.node_certificate.subject_name,
2306 LOG_INFO_FMT(
"[global] Now:\n{}", self_signed_node_cert.str());
2309 accept_node_tls_connections();
2315 "[global] Self-signed node cert remains:\n{}",
2316 self_signed_node_cert.str());
2324 network.tables->set_global_hook(
2325 network.service.get_name(),
2326 network.service.wrap_commit_hook(
2330 throw std::logic_error(
"Unexpected deletion in service value");
2336 auto current_pubk_pem =
2339 if (hook_pubk_pem != current_pubk_pem)
2342 "Ignoring historical service open at seqno {} for {}",
2349 "Executing global hook for service table at {}, to service "
2350 "status {}. Cert is:\n{}",
2355 network.identity->set_certificate(w->cert);
2358 open_user_frontend();
2361 LOG_INFO_FMT(
"Service open at seqno {}", hook_version);
2365 network.tables->set_global_hook(
2366 network.acme_certificates.get_name(),
2367 network.acme_certificates.wrap_commit_hook(
2370 for (
auto const& [interface_id, interface] :
2371 config.network.rpc_interfaces)
2373 if (interface.endorsement->acme_configuration)
2375 auto cit = w.find(*interface.endorsement->acme_configuration);
2379 "ACME: new certificate for interface '{}' with "
2380 "configuration '{}'",
2382 *interface.endorsement->acme_configuration);
2383 rpcsessions->set_cert(
2386 network.identity->priv_key,
2401 std::lock_guard<pal::Mutex> guard(lock);
2402 return last_recovered_signed_idx;
2405 void setup_recovery_hook()
2407 network.tables->set_map_hook(
2408 network.encrypted_ledger_secrets.get_name(),
2409 network.encrypted_ledger_secrets.wrap_map_hook(
2412 const EncryptedLedgerSecretsInfo::Write& w)
2414 auto encrypted_ledger_secret_info = w;
2415 if (!encrypted_ledger_secret_info.has_value())
2417 throw std::logic_error(fmt::format(
2418 "Unexpected removal from {} table",
2419 network.encrypted_ledger_secrets.get_name()));
2424 if (!encrypted_ledger_secret_info->next_version.has_value())
2426 encrypted_ledger_secret_info->next_version = version + 1;
2429 if (encrypted_ledger_secret_info->previous_ledger_secret
2433 "Recovering encrypted ledger secret valid at seqno {}",
2434 encrypted_ledger_secret_info->previous_ledger_secret->version);
2437 recovered_encrypted_ledger_secrets.emplace_back(
2438 std::move(encrypted_ledger_secret_info.value()));
2444 void reset_recovery_hook()
2446 network.tables->unset_map_hook(
2447 network.encrypted_ledger_secrets.get_name());
2450 void setup_n2n_channels(
2451 const std::optional<ccf::crypto::Pem>& endorsed_node_certificate_ =
2458 n2n_channels->initialize(
2459 self, network.identity->cert, node_sign_kp, endorsed_node_certificate_);
2462 void setup_cmd_forwarder()
2464 cmd_forwarder->initialize(self);
2467 void setup_history()
2471 throw std::logic_error(
"History already initialised");
2474 history = std::make_shared<MerkleTxHistory>(
2475 *network.tables.get(),
2481 network.tables->set_history(history);
2484 void setup_encryptor()
2488 throw std::logic_error(
"Encryptor already initialised");
2491 encryptor = make_encryptor();
2492 network.tables->set_encryptor(encryptor);
2495 void setup_consensus(
2496 ServiceStatus service_status,
2498 bool public_only =
false,
2499 const std::optional<ccf::crypto::Pem>& endorsed_node_certificate_ =
2502 setup_n2n_channels(endorsed_node_certificate_);
2503 setup_cmd_forwarder();
2505 auto shared_state = std::make_shared<aft::State>(self);
2507 auto node_client = std::make_shared<HTTPNodeClient>(
2508 rpc_map, node_sign_kp, self_signed_node_cert, endorsed_node_cert);
2516 std::make_unique<::consensus::LedgerEnclave>(writer_factory),
2522 reconfiguration_type);
2524 network.tables->set_consensus(
consensus);
2525 network.tables->set_snapshotter(snapshotter);
2529 network.tables->set_map_hook(
2530 network.nodes.get_name(),
2531 network.nodes.wrap_map_hook(
2534 return std::make_unique<ConfigurationChangeHook>(version, w);
2543 network.tables->set_map_hook(
2544 network.signatures.get_name(),
2545 network.signatures.wrap_map_hook(
2546 [s = this->snapshotter](
2548 assert(w.has_value());
2549 auto sig = w.value();
2550 s->record_signature(version, sig.sig, sig.node, sig.cert);
2551 return ccf::kv::ConsensusHookPtr(nullptr);
2554 network.tables->set_map_hook(
2555 network.serialise_tree.get_name(),
2556 network.serialise_tree.wrap_map_hook(
2557 [s = this->snapshotter](
2559 assert(w.has_value());
2560 auto tree = w.value();
2561 s->record_serialised_tree(version, tree);
2562 return ccf::kv::ConsensusHookPtr(nullptr);
2565 network.tables->set_map_hook(
2566 network.snapshot_evidence.get_name(),
2567 network.snapshot_evidence.wrap_map_hook(
2568 [s = this->snapshotter](
2570 assert(w.has_value());
2571 auto snapshot_evidence = w.value();
2572 s->record_snapshot_evidence_idx(version, snapshot_evidence);
2573 return ccf::kv::ConsensusHookPtr(nullptr);
2576 setup_basic_hooks();
2579 void setup_snapshotter()
2583 throw std::logic_error(
"Snapshotter already initialised");
2585 snapshotter = std::make_shared<Snapshotter>(
2586 writer_factory, network.tables, config.snapshot_tx_interval);
2592 ::consensus::ledger_get_range,
2602 ::consensus::ledger_truncate, to_host, idx, recovery_mode);
2605 void setup_acme_clients()
2607 if (!config.network.acme || config.network.acme->configurations.empty())
2612 open_acme_challenge_frontend();
2614 const auto& ifaces = config.network.rpc_interfaces;
2615 num_acme_interfaces =
2616 std::count_if(ifaces.begin(), ifaces.end(), [](
const auto& id_iface) {
2617 return id_iface.second.endorsement->authority == Authority::ACME;
2620 if (num_acme_interfaces > 0)
2626 auto msg = std::make_unique<::threading::Tmsg<NodeStateMsg>>(
2627 [](std::unique_ptr<::threading::Tmsg<NodeStateMsg>> msg) {
2628 auto& state = msg->data.self;
2630 if (state.consensus && state.consensus->can_replicate())
2632 if (state.acme_clients.size() != state.num_acme_interfaces)
2634 auto tx = state.network.tables->create_tx();
2635 state.trigger_acme_refresh(tx);
2640 for (
auto& [cfg_name,
client] : state.acme_clients)
2645 state.network.tables, state.network.identity);
2651 auto delay = std::chrono::minutes(1);
2653 std::move(msg), delay);
2658 std::move(msg), std::chrono::seconds(2));
2665 n2n_channels->set_message_limit(message_limit);
2670 n2n_channels->set_idle_timeout(idle_timeout);
2680 return network.identity->cert;
2685 std::shared_ptr<ACMEChallengeHandler> h)
override
2687 acme_challenge_handlers[interface_id] = h;
2692 const ::http::URL& url,
2697 const std::vector<std::string>& ca_certs = {},
2698 const std::string& app_protocol =
"HTTP1",
2699 bool authenticate_as_node_client_certificate =
false)
override
2701 std::optional<ccf::crypto::Pem> client_cert = std::nullopt;
2702 std::optional<ccf::crypto::Pem> client_cert_key = std::nullopt;
2703 if (authenticate_as_node_client_certificate)
2706 endorsed_node_cert ? *endorsed_node_cert : self_signed_node_cert;
2707 client_cert_key = node_sign_kp->private_key_pem();
2710 auto ca = std::make_shared<::tls::CA>(ca_certs,
true);
2711 std::shared_ptr<::tls::Cert> ca_cert =
2712 std::make_shared<::tls::Cert>(ca, client_cert, client_cert_key);
2713 auto client = rpcsessions->create_client(ca_cert, app_protocol);
2719 http::HeaderMap&& headers,
2720 std::vector<uint8_t>&& data) {
2721 return callback(status, std::move(headers), std::move(data));
2723 client->send_request(std::move(req));
2728 snapshotter->write_snapshot(snapshot_buf, request_id);
2731 virtual std::shared_ptr<ccf::kv::Store>
get_store()
override
2733 return network.tables;
2738 return writer_factory;