151 join_config(
std::move(join_config_)),
152 snapshot_config(
std::move(snapshot_config_)),
156 void do_task_implementation()
override
160 auto latest_peer_snapshot = snapshots::fetch_from_peer(
175 std::lock_guard<pal::Mutex> guard(owner->lock);
176 owner->snapshot_fetch_task =
nullptr;
178 } clear_on_exit{owner};
180 if (latest_peer_snapshot.has_value())
183 "Received snapshot {} from peer (size: {})",
184 latest_peer_snapshot->snapshot_name,
185 latest_peer_snapshot->snapshot_data.size());
189 const auto segments =
190 separate_segments(latest_peer_snapshot->snapshot_data);
193 catch (
const std::exception& e)
196 "Error while verifying fetched snapshot {}: {}",
197 latest_peer_snapshot->snapshot_name,
203 const auto snapshot_path =
204 std::filesystem::path(latest_peer_snapshot->snapshot_name);
209 snapshot_path.empty() || snapshot_path.is_absolute() ||
210 snapshot_path.has_parent_path() ||
211 snapshot_path.filename() != snapshot_path)
214 "Rejecting snapshot with invalid name '{}' from peer",
215 latest_peer_snapshot->snapshot_name);
218 const auto dst_path =
219 std::filesystem::path(snapshot_config.
directory) / snapshot_path;
222 "Snapshot verified - now writing to {}", dst_path.string());
224 if (files::exists(dst_path))
227 "Overwriting existing snapshot at {} with data retrieved from "
231 files::dump(latest_peer_snapshot->snapshot_data, dst_path);
233 const auto snapshot_seqno =
234 snapshots::get_snapshot_idx_from_file_name(
235 latest_peer_snapshot->snapshot_name);
237 std::lock_guard<pal::Mutex> guard(owner->lock);
238 owner->set_startup_snapshot(
239 snapshot_seqno, std::move(latest_peer_snapshot->snapshot_data));
243 [[nodiscard]]
const std::string& get_name()
const override
245 static const std::string name =
"FetchSnapshot";
260 snapshot_config(
std::move(snapshot_config_)),
261 since_seqno(since_seqno_),
265 void do_task_implementation()
override
272 std::lock_guard<pal::Mutex> guard(owner->lock);
273 owner->backup_snapshot_fetch_task =
nullptr;
275 } clear_on_exit{owner};
278 std::string primary_address;
279 std::vector<uint8_t> service_cert;
281 auto primary_id = owner->consensus->primary();
282 if (!primary_id.has_value())
285 "BackupSnapshotFetch: No known primary, skipping fetch");
289 auto tx = owner->network.tables->create_read_only_tx();
291 auto node_info =
nodes->get(primary_id.value());
292 if (!node_info.has_value())
295 "BackupSnapshotFetch: Could not find primary node {} in nodes "
302 const auto& target_interface =
304 auto iface_it = node_info->rpc_interfaces.find(target_interface);
305 if (iface_it == node_info->rpc_interfaces.end())
308 "BackupSnapshotFetch: Primary node {} does not have RPC "
309 "interface '{}' configured",
314 primary_address = iface_it->second.published_address;
316 if (owner->network.identity ==
nullptr)
319 "BackupSnapshotFetch: No service identity available, cannot "
320 "construct TLS credentials for fetching snapshot");
324 service_cert = owner->network.identity->cert.raw();
328 "BackupSnapshotFetch: Attempting to fetch snapshot from primary at "
334 auto latest_peer_snapshot = snapshots::fetch_from_peer(
338 bf.retry_interval.count_ms(),
339 bf.max_size.count_bytes(),
342 if (latest_peer_snapshot.has_value())
345 "BackupSnapshotFetch: Received snapshot {} from primary (size: "
347 latest_peer_snapshot->snapshot_name,
348 latest_peer_snapshot->snapshot_data.size());
350 const auto snapshot_path =
351 std::filesystem::path(latest_peer_snapshot->snapshot_name);
354 snapshot_path.empty() || snapshot_path.is_absolute() ||
355 snapshot_path.has_parent_path() ||
356 snapshot_path.filename() != snapshot_path)
359 "BackupSnapshotFetch: Rejecting snapshot with invalid name "
361 latest_peer_snapshot->snapshot_name);
365 const auto dst_path =
366 std::filesystem::path(snapshot_config.
directory) / snapshot_path;
368 if (files::exists(dst_path))
371 "BackupSnapshotFetch: Snapshot {} already exists locally, "
377 files::dump(latest_peer_snapshot->snapshot_data, dst_path);
379 "BackupSnapshotFetch: Wrote snapshot {} ({} bytes)",
381 latest_peer_snapshot->snapshot_data.size());
386 "BackupSnapshotFetch: No snapshot available from primary");
390 [[nodiscard]]
const std::string& get_name()
const override
392 static const std::string name =
"BackupSnapshotFetch";
406 std::vector<ccf::crypto::SubjectAltName> subject_alt_names;
408 std::shared_ptr<ccf::crypto::ECKeyPair_OpenSSL> node_sign_kp;
410 std::shared_ptr<ccf::crypto::RSAKeyPair> node_encrypt_kp;
412 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
413 QuoteInfo quote_info;
414 pal::PlatformAttestationMeasurement node_measurement;
415 std::optional<pal::snp::TcbVersionRaw> snp_tcb_version = std::nullopt;
417 std::optional<pal::UVMEndorsements> snp_uvm_endorsements = std::nullopt;
418 std::shared_ptr<QuoteEndorsementsClient> quote_endorsements_client =
421 std::atomic<bool> stop_noticed =
false;
429 size_t sig_tx_interval = 0;
430 size_t sig_ms_interval = 0;
432 NetworkState& network;
434 std::shared_ptr<ccf::kv::Consensus>
consensus;
435 std::shared_ptr<RPCMap> rpc_map;
436 std::shared_ptr<indexing::Indexer> indexer;
437 std::shared_ptr<NodeToNode> n2n_channels;
438 std::shared_ptr<Forwarder<NodeToNode>> cmd_forwarder;
439 std::shared_ptr<ccf::CommitCallbackSubsystem> commit_callbacks =
nullptr;
440 std::shared_ptr<ccf::SignatureCacheSubsystem> signature_cache =
nullptr;
441 std::shared_ptr<RPCSessions> rpcsessions;
443 std::shared_ptr<ccf::kv::TxHistory> history;
444 std::shared_ptr<ccf::kv::AbstractTxEncryptor> encryptor;
446 ShareManager share_manager;
447 std::shared_ptr<Snapshotter> snapshotter;
452 std::shared_ptr<ccf::kv::Store> recovery_store;
456 std::vector<ccf::kv::Version> view_history;
460 std::tuple<ccf::NodeId, std::vector<uint8_t>, SealedRecoveryKey>>
461 cached_sealed_recovery_data = std::nullopt;
463 static const size_t recovery_batch_size = 100;
468 std::shared_ptr<JwtKeyAutoRefresh> jwt_key_auto_refresh;
470 std::unique_ptr<StartupSnapshotInfo> startup_snapshot_info =
nullptr;
480 size_t join_fetch_count = 0;
482 std::shared_ptr<ccf::kv::AbstractTxEncryptor> make_encryptor()
484#ifdef USE_NULL_ENCRYPTOR
485 return std::make_shared<ccf::kv::NullTxEncryptor>();
487 return std::make_shared<NodeEncryptor>(network.ledger_secrets);
491 void find_local_startup_snapshot()
498 std::vector<std::filesystem::path> directories;
501 if (read_only_dir.has_value())
503 directories.emplace_back(read_only_dir.value());
506 const auto committed_snapshots =
508 for (
const auto& [snapshot_seqno, snapshot_path] : committed_snapshots)
510 auto snapshot_data = files::slurp(snapshot_path);
513 "Found latest local snapshot file: {} (size: {})",
515 snapshot_data.size());
517 const auto segments = separate_segments(snapshot_data);
523 catch (
const std::exception& e)
526 "Error while verifying {}: {}", snapshot_path.string(), e.what());
528 const auto dir = snapshot_path.parent_path();
529 const auto file_name = snapshot_path.filename();
534 "Ignoring corrupt snapshot {} in directory {} and looking for "
537 snapshot_path.string());
540 snapshots::ignore_snapshot_file(dir, file_name.string());
542 catch (std::logic_error& e)
544 LOG_FAIL_FMT(
"Unable to mark snapshot as ignored: {}", e.what());
550 "Snapshot {} is in a read-only directory {}, so will not be "
551 "modified. Ignoring and looking for next",
552 snapshot_path.string(),
559 set_startup_snapshot(snapshot_seqno, std::move(snapshot_data));
566 void set_startup_snapshot(
569 startup_snapshot_info = std::make_unique<StartupSnapshotInfo>(
570 snapshot_seqno, std::move(snapshot_data));
572 startup_seqno = startup_snapshot_info->seqno;
573 last_recovered_idx = startup_seqno;
574 last_recovered_signed_idx = last_recovered_idx;
580 join_fetch_count += 1;
585 const auto segments = separate_segments(startup_snapshot_info->raw);
588 deserialise_snapshot(
596 auto tx = network.tables->create_read_only_tx();
599 if (status.has_value())
601 snapshotter->init_from_snapshot_status(status.value());
613 std::shared_ptr<RPCSessions> rpcsessions,
619 node_encrypt_kp(
ccf::crypto::make_rsa_key_pair()),
620 writer_factory(writer_factory),
621 to_host(writer_factory.create_writer_to_outside()),
623 rpcsessions(
std::move(rpcsessions)),
624 share_manager(network.ledger_secrets),
625 recovery_decision_protocol(this)
631 const std::vector<uint8_t>& expected_node_public_key_der,
633 const std::optional<std::vector<uint8_t>>& code_transparent_statement,
634 std::shared_ptr<NetworkIdentitySubsystemInterface>
635 network_identity_subsystem)
override
640 expected_node_public_key_der,
642 code_transparent_statement,
643 network_identity_subsystem);
651 std::shared_ptr<RPCMap> rpc_map_,
652 std::shared_ptr<AbstractRPCResponder> rpc_sessions_,
653 std::shared_ptr<indexing::Indexer> indexer_,
654 std::shared_ptr<ccf::CommitCallbackSubsystem> commit_callbacks_,
655 std::shared_ptr<ccf::SignatureCacheSubsystem> signature_cache_,
656 size_t sig_tx_interval_,
657 size_t sig_ms_interval_)
659 std::lock_guard<pal::Mutex> guard(lock);
662 consensus_config = consensus_config_;
666 commit_callbacks = commit_callbacks_;
667 signature_cache = signature_cache_;
669 sig_tx_interval = sig_tx_interval_;
670 sig_ms_interval = sig_ms_interval_;
672 n2n_channels = std::make_shared<NodeToNodeChannelManager>(writer_factory);
674 cmd_forwarder = std::make_shared<Forwarder<NodeToNode>>(
675 rpc_sessions_, n2n_channels, rpc_map);
679 for (
auto& [actor, fe] : rpc_map->frontends())
681 fe->set_sig_intervals(sig_tx_interval, sig_ms_interval);
682 fe->set_cmd_forwarder(cmd_forwarder);
692 if (measurement.has_value())
694 node_measurement = measurement.value();
698 throw std::logic_error(
"Failed to extract code id from quote");
701 auto snp_attestation =
703 if (snp_attestation.has_value())
705 snp_tcb_version = snp_attestation.value().reported_tcb;
712 "Security policy not set, skipping check against attestation host "
718 if (!quoted_digest.has_value())
720 throw std::logic_error(
"Unable to find host data in attestation");
723 auto const& security_policy =
726 auto security_policy_digest =
730 if (security_policy_digest != quoted_digest.value())
732 throw std::logic_error(fmt::format(
733 "Digest of decoded security policy \"{}\" {} does not match "
734 "attestation host data {}",
736 security_policy_digest.hex_str(),
737 quoted_digest.value().hex_str()));
740 "Successfully verified attested security policy {}",
741 security_policy_digest);
749 "UVM endorsements not set, skipping check against attestation "
765 uvm_endorsements_raw, node_measurement);
766 quote_info.uvm_endorsements = uvm_endorsements_raw;
768 "Successfully verified attested UVM endorsements: {}",
769 snp_uvm_endorsements->to_str());
771 catch (
const std::exception& e)
773 throw std::logic_error(
774 fmt::format(
"Error verifying UVM endorsements: {}", e.what()));
783 create_and_send_boot_request(
784 aft::starting_view_change,
true );
789 find_local_startup_snapshot();
797 setup_recovery_hook();
799 find_local_startup_snapshot();
807 throw std::logic_error(
808 fmt::format(
"Node was launched in unknown mode {}", start_type));
815 auto fetch_endorsements = [
this](
818 EndorsementEndpointsConfiguration&
822 this->quote_info = qi;
826 to_json(jq, quote_info.format);
828 "Initial node attestation ({}): {}", jq.dump(), b64encoded_quote);
840 const auto j = nlohmann::json::parse(raw_data);
841 const auto aci_endorsements =
848 quote_info.quote.data());
849 const auto reported_tcb = quote->reported_tcb;
853 const auto* tcb_begin =
854 reinterpret_cast<const uint8_t*
>(&reported_tcb);
855 const std::span<const uint8_t> tcb_bytes{
856 tcb_begin, tcb_begin +
sizeof(reported_tcb)};
857 auto tcb_as_hex = fmt::format(
858 "{:02x}", fmt::join(tcb_bytes.rbegin(), tcb_bytes.rend(),
""));
859 ccf::nonstd::to_upper(tcb_as_hex);
861 if (tcb_as_hex == aci_endorsements.tcbm)
864 "Using SNP endorsements loaded from file, endorsing TCB {}",
867 auto& endorsements_pem = quote_info.endorsements;
868 endorsements_pem.insert(
869 endorsements_pem.end(),
870 aci_endorsements.vcek_cert.begin(),
871 aci_endorsements.vcek_cert.end());
872 endorsements_pem.insert(
873 endorsements_pem.end(),
874 aci_endorsements.certificate_chain.begin(),
875 aci_endorsements.certificate_chain.end());
882 catch (
const std::exception& e)
891 "SNP endorsements loaded from disk ({}) contained tcbm {}, "
892 "which does not match reported TCB of current attestation "
894 "Falling back to fetching fresh endorsements from server.",
896 aci_endorsements.tcbm,
900 catch (
const std::exception& e)
903 "Error attempting to use SNP endorsements from file: {}",
910 throw std::runtime_error(
911 "One or more SNP endorsements servers must be specified to fetch "
912 "the collateral for the attestation");
915 quote_endorsements_client = std::make_shared<QuoteEndorsementsClient>(
916 endpoint_config, [
this](std::vector<uint8_t>&& endorsements) {
917 std::lock_guard<pal::Mutex> guard(lock);
918 quote_info.endorsements = std::move(endorsements);
923 catch (
const std::exception& e)
928 quote_endorsements_client.reset();
931 quote_endorsements_client->fetch_endorsements();
937 throw std::runtime_error(fmt::format(
938 "Unsupported quote format: {}",
939 static_cast<int>(quote_info.format)));
957 std::lock_guard<pal::Mutex> guard(lock);
959 start_type = start_type_;
962 subject_alt_names = get_subject_alternative_names();
965 self_signed_node_cert = create_self_signed_cert(
972 accept_node_tls_connections();
987 network.identity = std::make_unique<ccf::NetworkIdentity>(
993 network.ledger_secrets->init();
995 history->set_service_signing_identity(
998 setup_consensus(
false, endorsed_node_cert);
1004 return {self_signed_node_cert, network.identity->cert};
1009 return {self_signed_node_cert, {}};
1015 throw std::logic_error(
1016 "Recovery requires the certificate of the previous service "
1023 network.identity = std::make_unique<ccf::NetworkIdentity>(
1030 return {self_signed_node_cert, network.identity->cert};
1034 throw std::logic_error(
1035 fmt::format(
"Node was started in unknown mode {}", start_type));
1048 auto network_ca = std::make_shared<::tls::CA>(std::string(
1051 auto [target_host, target_port] =
1054 auto join_client_cert = std::make_unique<::tls::Cert>(
1056 self_signed_node_cert,
1057 node_sign_kp->private_key_pem(),
1063 auto join_client = rpcsessions->create_client(
1064 std::move(join_client_cert),
1065 rpcsessions->get_app_protocol_main_interface());
1067 join_client->connect(
1076 std::vector<uint8_t>&& data) {
1077 std::lock_guard<pal::Mutex> guard(lock);
1078 if (!sm.check(NodeStartupState::pending))
1083 if (is_http_status_client_error(status))
1085 std::optional<ccf::ODataErrorResponse> error_response =
1090 auto j = nlohmann::json::parse(data);
1093 catch (
const nlohmann::json::exception& e)
1097 "Join request returned {}, body is not ODataErrorResponse: {}",
1099 std::string(data.begin(), data.end()));
1103 error_response.has_value() &&
1104 error_response->error.code == ccf::errors::StartupSeqnoIsOld &&
1108 "Join request to {} returned {} error. Attempting to fetch "
1111 ccf::errors::StartupSeqnoIsOld);
1119 snapshot_fetch_task !=
nullptr &&
1120 !snapshot_fetch_task->is_cancelled())
1122 LOG_INFO_FMT(
"Snapshot fetch already in progress, skipping");
1126 snapshot_fetch_task = std::make_shared<FetchSnapshot>(
1133 auto error_msg = fmt::format(
1134 "Join request to {} returned {} Bad Request: {}. Shutting "
1135 "down node gracefully.",
1138 std::string(data.begin(), data.end()));
1141 AdminMessage::fatal_error_msg, to_host, error_msg);
1145 if (status != HTTP_STATUS_OK)
1147 const auto& location = headers.find(http::headers::LOCATION);
1150 (status == HTTP_STATUS_PERMANENT_REDIRECT ||
1151 status == HTTP_STATUS_TEMPORARY_REDIRECT) &&
1152 location != headers.end())
1156 make_net_address(url.host, url.port);
1157 LOG_INFO_FMT(
"Target node redirected to {}", location->second);
1162 "An error occurred while joining the network: {} {}{}",
1164 ccf::http_status_str(status),
1167 fmt::format(
" '{}'", std::string(data.begin(), data.end())));
1175 auto j = nlohmann::json::parse(data);
1178 catch (
const std::exception& e)
1181 "An error occurred while parsing the join network response");
1185 "Join network response body: {}",
1186 std::string(data.begin(), data.end()));
1196 throw std::logic_error(
"Expected network info in join response");
1199 network.identity = std::make_unique<ccf::NetworkIdentity>(
1201 network.ledger_secrets->init_from_map(
1204 history->set_service_signing_identity(
1205 network.identity->get_key_pair(),
1210 if (!resp.
network_info->endorsed_certificate.has_value())
1213 throw std::logic_error(
1214 "Expected endorsed certificate in join response");
1216 n2n_channels_cert = resp.
network_info->endorsed_certificate.value();
1218 setup_consensus(resp.
network_info->public_only, n2n_channels_cert);
1223 last_recovered_signed_idx =
1225 setup_recovery_hook();
1226 snapshotter->set_snapshot_generation(
false);
1230 std::vector<ccf::kv::Version> view_history_ = {};
1231 if (startup_snapshot_info)
1236 deserialise_snapshot(
1238 startup_snapshot_info->raw,
1243 for (
auto& hook : hooks)
1248 auto tx = network.tables->create_read_only_tx();
1256 startup_snapshot_info.reset();
1260 "Joiner successfully resumed from snapshot at seqno {} and "
1262 network.tables->current_version(),
1267 network.tables->current_version(),
1270 last_recovered_signed_idx);
1273 auto snap_tx = network.tables->create_read_only_tx();
1274 auto snapshot_status =
1276 if (snapshot_status.has_value())
1278 snapshotter->init_from_snapshot_status(snapshot_status.value());
1281 history->start_signature_emit_timer();
1294 if (join_periodic_task !=
nullptr)
1296 join_periodic_task->cancel_task();
1297 join_periodic_task =
nullptr;
1301 "Node has now joined the network as node {}: {}",
1303 (resp.
network_info->public_only ?
"public only" :
"all domains"));
1308 "Node {} is waiting for votes of members to be trusted", self);
1311 [
this](
const std::string& error_msg) {
1312 std::lock_guard<pal::Mutex> guard(lock);
1313 auto long_error_msg = fmt::format(
1314 "Early error when joining existing network at {}: {}. Shutting "
1315 "down node gracefully...",
1320 AdminMessage::fatal_error_msg, to_host, long_error_msg);
1324 JoinNetworkNodeToNode::In join_params;
1326 join_params.node_info_network = config.
network;
1327 join_params.public_encryption_key = node_encrypt_kp->public_key_pem();
1328 join_params.quote_info = quote_info;
1329 join_params.startup_seqno = startup_seqno;
1332 join_params.join_fetch_count = join_fetch_count;
1336 join_params.join_fetch_count = 1;
1338 join_params.certificate_signing_request = node_sign_kp->create_csr(
1340 join_params.node_data = config.
node_data;
1344 join_params.sealing_recovery_data = std::make_pair(
1352 "Reading code_transparent_statement from file: {}",
1354 auto ts = files::slurp(
1356 join_params.code_transparent_statement = std::move(ts);
1362 const auto body = nlohmann::json(join_params).dump();
1369 http::headers::CONTENT_TYPE, http::headervalues::contenttype::JSON);
1372 join_client->send_request(std::move(r));
1377 std::lock_guard<pal::Mutex> guard(lock);
1378 initiate_join_unsafe();
1383 initiate_join_unsafe();
1386 std::lock_guard<pal::Mutex> guard(this->lock);
1389 this->initiate_join_unsafe();
1404 "JWT key auto-refresh: consensus not initialized, not starting "
1408 jwt_key_auto_refresh = std::make_shared<JwtKeyAutoRefresh>(
1415 self_signed_node_cert);
1416 jwt_key_auto_refresh->start();
1418 network.tables->set_map_hook(
1419 network.jwt_issuers.get_name(),
1422 jwt_key_auto_refresh->schedule_once();
1429 return jwt_key_auto_refresh->get_attempts();
1437 if (!sm.
check(NodeStartupState::readingPublicLedger))
1439 throw std::logic_error(fmt::format(
1440 "Node should be in state {} to start reading ledger",
1441 NodeStartupState::readingPublicLedger));
1446 read_ledger_entries(
1447 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
1452 std::lock_guard<pal::Mutex> guard(lock);
1454 sm.
expect(NodeStartupState::readingPublicLedger);
1456 const auto* data = entries.data();
1457 auto size = entries.size();
1461 recover_public_ledger_end_unsafe();
1470 "Deserialising public ledger entry #{} [{} bytes]",
1479 auto r = network.tables->deserialize(entry,
true);
1480 result = r->apply();
1484 "Failed to deserialise public ledger entry: {}", result);
1485 recover_public_ledger_end_unsafe();
1488 ++last_recovered_idx;
1491 for (
auto& hook : r->get_hooks())
1496 catch (
const std::exception& e)
1499 "Failed to deserialise public ledger entry: {}", e.what());
1500 recover_public_ledger_end_unsafe();
1508 network.tables->compact(last_recovered_idx);
1509 auto tx = network.tables->create_read_only_tx();
1514 "Read signature at {} for view {}", last_recovered_idx, sig_view);
1522 const auto view_start_idx =
1523 view_history.empty() ? 1 : last_recovered_signed_idx + 1;
1524 CCF_ASSERT_FMT(sig_view >= 0,
"sig_view is invalid, {}", sig_view);
1525 for (
auto i = view_history.size(); i < static_cast<size_t>(sig_view);
1528 view_history.push_back(view_start_idx);
1530 last_recovered_signed_idx = last_recovered_idx;
1534 read_ledger_entries(
1535 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
1540 std::lock_guard<pal::Mutex> guard(lock);
1541 sm.
expect(NodeStartupState::readingPublicLedger);
1542 history->start_signature_emit_timer();
1543 sm.
advance(NodeStartupState::partOfPublicNetwork);
1548 std::lock_guard<pal::Mutex> guard(lock);
1549 sm.
expect(NodeStartupState::initialized);
1550 history->start_signature_emit_timer();
1551 auto_refresh_jwt_keys();
1554 sm.
advance(NodeStartupState::partOfNetwork);
1559 sm.
expect(NodeStartupState::readingPublicLedger);
1563 const auto last_recovered_term = view_history.size();
1564 auto new_term = last_recovered_term + aft::starting_view_change;
1565 LOG_INFO_FMT(
"Setting term on public recovery store to {}", new_term);
1568 network.tables->rollback(
1569 {last_recovered_term, last_recovered_signed_idx}, new_term);
1570 ledger_truncate(last_recovered_signed_idx,
true);
1571 snapshotter->rollback(last_recovered_signed_idx);
1574 "End of public ledger recovery - Truncating ledger to last signed "
1576 last_recovered_term,
1577 last_recovered_signed_idx);
1579 auto tx = network.tables->create_read_only_tx();
1580 network.ledger_secrets->init(last_recovered_signed_idx + 1);
1583 snapshotter->init_after_public_recovery();
1584 snapshotter->set_snapshot_generation(
false);
1591 auto ls = tx.ro(network.signatures)->get();
1594 auto s = ls.value();
1595 sig_seqno = s.seqno;
1601 auto lcs = tx.ro(network.cose_signatures)->get();
1602 if (lcs.has_value())
1608 cose::decode_ccf_receipt(cs,
false);
1613 if (!tx_id_opt.has_value())
1615 throw std::logic_error(fmt::format(
1616 "Failed to parse TxID from COSE signature: {}",
1617 as_receipt.phdr.ccf.txid));
1620 cose_seqno = tx_id_opt->seqno;
1625 if (tx_id_opt->seqno > index)
1627 index = tx_id_opt->seqno;
1628 view = tx_id_opt->view;
1631 auto issuer = as_receipt.phdr.cwt.iss;
1632 auto subject = as_receipt.phdr.cwt.sub;
1634 "COSE signature issuer: {}, subject: {}", issuer, subject);
1640 LOG_FAIL_FMT(
"COSE signature decode error: {}", e.what());
1646 LOG_INFO_FMT(
"No COSE signature found after recovery");
1649 if (!ls.has_value() && !lcs.has_value())
1651 throw std::logic_error(
"No signature found after recovery");
1659 lcs.has_value() && cose_seqno > sig_seqno)
1661 throw std::logic_error(
1662 "Cannot recover a COSE-only ledger with a Dual signing binary. "
1663 "Use a COSE-only binary to recover this ledger.");
1666 history->set_service_signing_identity(
1667 network.identity->get_key_pair(), cs_cfg);
1679 auto* node_id_lookup =
1681 auto local_sealing_node_id_opt = node_id_lookup->get(name);
1682 if (local_sealing_node_id_opt.has_value())
1684 auto& local_sealing_node_id = local_sealing_node_id_opt.value();
1685 auto sealed_recovery_shares_opt =
1687 if (sealed_recovery_shares_opt.has_value())
1689 auto sealed_recovery_shares = sealed_recovery_shares_opt.value();
1690 auto sealed_share_it =
1691 sealed_recovery_shares.encrypted_wrapping_keys.find(
1692 local_sealing_node_id);
1694 auto sealed_recovery_key =
1696 ->get(local_sealing_node_id.value());
1700 sealed_recovery_shares.encrypted_wrapping_keys.end() &&
1701 sealed_recovery_key.has_value())
1703 cached_sealed_recovery_data = std::make_tuple(
1704 local_sealing_node_id.value(),
1705 sealed_share_it->second,
1706 sealed_recovery_key.value());
1711 if (!cached_sealed_recovery_data.has_value())
1713 throw std::logic_error(fmt::format(
1714 "Failed to find sealed recovery data for location ({}) in ledger "
1717 last_recovered_signed_idx));
1721 setup_consensus(
true);
1722 auto_refresh_jwt_keys();
1724 LOG_DEBUG_FMT(
"Restarting consensus at view: {} seqno: {}", view, index);
1726 consensus->force_become_primary(index, view, view_history, index);
1728 create_and_send_boot_request(
1737 std::lock_guard<pal::Mutex> guard(lock);
1738 if (!sm.
check(NodeStartupState::readingPrivateLedger))
1741 "Node in state {} cannot recover private ledger entries", sm.
value());
1745 const auto* data = entries.data();
1746 auto size = entries.size();
1750 recover_private_ledger_end_unsafe();
1759 "Deserialising private ledger entry {} [{}]",
1760 last_recovered_idx + 1,
1767 result = recovery_store->deserialize(entry)->apply();
1771 "Failed to deserialise private ledger entry: {}", result);
1774 recovery_store->rollback({0, last_recovered_idx}, 0);
1775 recover_private_ledger_end_unsafe();
1778 ++last_recovered_idx;
1780 catch (
const std::exception& e)
1783 "Failed to deserialise private ledger entry: {}", e.what());
1784 recover_private_ledger_end_unsafe();
1790 recovery_store->compact(last_recovered_idx);
1794 if (recovery_store->current_version() == recovery_v)
1796 LOG_INFO_FMT(
"Reached recovery final version at {}", recovery_v);
1797 recover_private_ledger_end_unsafe();
1801 read_ledger_entries(
1802 last_recovered_idx + 1,
1803 std::min(last_recovered_idx + recovery_batch_size, recovery_v));
1812 sm.
expect(NodeStartupState::readingPrivateLedger);
1815 "Try end private recovery at {}. Is primary: {}",
1819 if (recovery_v != recovery_store->current_version())
1821 throw std::logic_error(fmt::format(
1822 "Private recovery did not reach public ledger seqno: {}/{}",
1823 recovery_store->current_version(),
1829 if (h->get_replicated_state_root() != recovery_root)
1831 throw std::logic_error(fmt::format(
1832 "Root of public store does not match root of private store at {}",
1836 network.tables->swap_private_maps(*recovery_store);
1837 recovery_store.reset();
1843 snapshotter->set_snapshot_generation(
true);
1849 "Try end private recovery at {}. Trigger service opening",
1852 auto tx = network.tables->create_tx();
1858 auto active_service = service->get();
1860 if (!active_service.has_value())
1862 throw std::logic_error(fmt::format(
1863 "Error in {}: no value in {}", __func__, Tables::SERVICE));
1867 active_service->status !=
1868 ServiceStatus::WAITING_FOR_RECOVERY_SHARES)
1870 throw std::logic_error(fmt::format(
1871 "Error in {}: current service status is {}",
1873 active_service->status));
1879 ShareManager::clear_submitted_recovery_shares(tx);
1883 share_manager.issue_recovery_shares(tx);
1886 !InternalTablesAccess::open_service(tx) ||
1887 !InternalTablesAccess::endorse_previous_identity(
1888 tx, *network.identity->get_key_pair()))
1890 throw std::logic_error(
"Service could not be opened");
1896 trigger_snapshot(tx);
1900 throw std::logic_error(
1901 "Could not commit transaction when finishing network recovery");
1904 recovered_encrypted_ledger_secrets.clear();
1907 sm.
advance(NodeStartupState::partOfNetwork);
1916 network.tables->set_map_hook(
1917 network.encrypted_ledger_secrets.get_name(),
1925 throw std::logic_error(fmt::format(
1926 "Unexpected removal from {} table",
1927 network.encrypted_ledger_secrets.get_name()));
1930 network.ledger_secrets->adjust_previous_secret_stored_version(
1933 network.tables->unset_map_hook(
1934 network.encrypted_ledger_secrets.get_name());
1945 std::lock_guard<pal::Mutex> guard(lock);
1947 if (is_reading_public_ledger())
1949 recover_public_ledger_end_unsafe();
1951 else if (is_reading_private_ledger())
1953 recover_private_ledger_end_unsafe();
1958 "Node in state {} cannot finalise ledger recovery", sm.
value());
1968 recovery_store = std::make_shared<ccf::kv::Store>(
1971 auto recovery_history = std::make_shared<MerkleTxHistory>(
1979 auto recovery_encryptor = make_encryptor();
1981 recovery_store->set_history(recovery_history);
1982 recovery_store->set_encryptor(recovery_encryptor);
1985 recovery_v = network.tables->current_version();
1987 recovery_root = h->get_replicated_state_root();
1989 if (startup_snapshot_info)
1991 std::vector<ccf::kv::Version> view_history_;
1993 deserialise_snapshot(
1995 startup_snapshot_info->raw,
1999 startup_snapshot_info.reset();
2003 "Recovery store successfully setup at {}. Target recovery seqno: {}",
2004 recovery_store->current_version(),
2010 share_manager.shuffle_recovery_shares(tx);
2018 throw std::logic_error(
"Could not cast tx to CommittableTx");
2027 if (committable_tx ==
nullptr)
2029 throw std::logic_error(
"Could not cast tx to CommittableTx");
2031 committable_tx->set_tx_flag(
2039 std::lock_guard<pal::Mutex> guard(lock);
2041 auto* service = tx.
rw<
Service>(Tables::SERVICE);
2042 auto service_info = service->get();
2043 if (!service_info.has_value())
2045 throw std::logic_error(
2046 "Service information cannot be found to transition service to "
2053 service_info->status == ServiceStatus::WAITING_FOR_RECOVERY_SHARES ||
2054 service_info->status == ServiceStatus::OPEN)
2057 "Service in state {} is already open", service_info->status);
2061 if (service_info->status == ServiceStatus::RECOVERING)
2063 const auto prev_ident =
2066 if (!prev_ident.has_value() || !identities.
previous.has_value())
2068 throw std::logic_error(
2069 "Recovery with service certificates requires both, a previous "
2070 "service identity written to the KV during recovery genesis and a "
2071 "transition_service_to_open proposal that contains previous and "
2072 "next service certificates");
2077 if (prev_ident.value() != from_proposal)
2079 throw std::logic_error(fmt::format(
2080 "Previous service identity does not match.\nActual:\n{}\nIn "
2083 from_proposal.
str()));
2087 if (identities.
next != service_info->cert)
2089 throw std::logic_error(fmt::format(
2090 "Service identity mismatch: the next service identity in the "
2091 "transition_service_to_open proposal does not match the current "
2092 "service identity:\nNext:\n{}\nCurrent:\n{}",
2094 service_info->cert.str()));
2097 if (is_part_of_public_network())
2101 ShareManager::clear_submitted_recovery_shares(tx);
2102 service_info->status = ServiceStatus::WAITING_FOR_RECOVERY_SHARES;
2103 service->put(service_info.value());
2108 if (!cached_sealed_recovery_data.has_value())
2110 throw std::logic_error(
2111 "Missing cached sealed recovery key for private recovery");
2114 auto& [last_sealed_node_id, last_sealed_wrapping_key, last_sealed_recovery_key] =
2115 cached_sealed_recovery_data.value();
2116 auto unsealed_ls = sealing::unseal_share(
2117 tx, last_sealed_wrapping_key, last_sealed_recovery_key);
2118 if (unsealed_ls.has_value())
2121 ->put(RecoveryType::LOCAL_UNSEALING);
2122 LOG_INFO_FMT(
"Unsealed ledger secret, initiating private recovery");
2123 initiate_private_recovery_unsealing_unsafe(tx, unsealed_ls.value());
2127 throw std::logic_error(
2128 "Failed to unseal ledger secret for private recovery");
2134 ->put(RecoveryType::RECOVERY_SHARES);
2139 if (is_part_of_network())
2146 share_manager.issue_recovery_shares(tx);
2148 catch (
const std::logic_error& e)
2150 throw std::logic_error(
2151 fmt::format(
"Failed to issue recovery shares: {}", e.what()));
2154 InternalTablesAccess::open_service(tx);
2155 InternalTablesAccess::endorse_previous_identity(
2156 tx, *network.identity->get_key_pair());
2157 trigger_snapshot(tx);
2161 throw std::logic_error(
2162 fmt::format(
"Node in state {} cannot open service", sm.
value()));
2167 std::lock_guard<pal::Mutex> guard(lock);
2168 sm.
expect(NodeStartupState::partOfPublicNetwork);
2170 share_manager.restore_recovery_shares_info(
2171 tx, recovered_encrypted_ledger_secrets);
2172 initiate_private_recovery_unsafe(tx, recovered_ledger_secrets);
2178 sm.
expect(NodeStartupState::partOfPublicNetwork);
2180 share_manager.restore_ledger_secrets_map(
2181 tx, recovered_encrypted_ledger_secrets, unsealed_ledger_secret);
2182 initiate_private_recovery_unsafe(tx, recovered_ledger_secrets);
2193 LedgerSecretsBroadcast::broadcast_some(
2194 InternalTablesAccess::get_trusted_nodes(tx),
2195 tx.
wo(network.secrets),
2196 recovered_ledger_secrets);
2202 void tick(std::chrono::milliseconds elapsed)
2205 !sm.
check(NodeStartupState::partOfNetwork) &&
2206 !sm.
check(NodeStartupState::partOfPublicNetwork) &&
2207 !sm.
check(NodeStartupState::readingPrivateLedger))
2214 if (sm.
check(NodeStartupState::partOfNetwork))
2216 const auto tx_id =
consensus->get_committed_txid();
2217 indexer->update_strategies(elapsed, {tx_id.first, tx_id.second});
2220 n2n_channels->tick(elapsed);
2226 !sm.
check(NodeStartupState::partOfNetwork) &&
2227 !sm.
check(NodeStartupState::partOfPublicNetwork) &&
2228 !sm.
check(NodeStartupState::readingPrivateLedger))
2239 stop_noticed =
true;
2244 return stop_noticed;
2249 auto [msg_type, from, payload] =
2250 ringbuffer::read_message<node_inbound>(data, size);
2252 const auto* payload_data = payload.data;
2253 auto payload_size = payload.size;
2255 if (msg_type == NodeMsgType::forwarded_msg)
2257 cmd_forwarder->recv_message(from, payload_data, payload_size);
2263 !sm.
check(NodeStartupState::partOfNetwork) &&
2264 !sm.
check(NodeStartupState::partOfPublicNetwork) &&
2265 !sm.
check(NodeStartupState::readingPrivateLedger))
2268 "Ignoring node msg received too early - current state is {}",
2277 LOG_FAIL_FMT(
"Unexpected forwarded_msg in recv_node_inbound");
2282 n2n_channels->recv_channel_message(
2283 from, payload_data, payload_size);
2289 consensus->recv_message(from, payload_data, payload_size);
2294 throw std::logic_error(fmt::format(
2295 "Unknown node message type: {}",
2296 static_cast<uint32_t
>(msg_type)));
2308 (sm.
check(NodeStartupState::partOfNetwork) ||
2309 sm.
check(NodeStartupState::partOfPublicNetwork) ||
2310 sm.
check(NodeStartupState::readingPrivateLedger)) &&
2317 (sm.
check(NodeStartupState::partOfNetwork) ||
2318 sm.
check(NodeStartupState::partOfPublicNetwork) ||
2319 sm.
check(NodeStartupState::readingPrivateLedger)) &&
2330 return sm.
check(NodeStartupState::initialized);
2335 return sm.
check(NodeStartupState::partOfNetwork);
2340 return sm.
check(NodeStartupState::readingPublicLedger);
2345 return sm.
check(NodeStartupState::readingPrivateLedger);
2350 return sm.
check(NodeStartupState::partOfPublicNetwork);
2355 const auto val = sm.
value();
2356 return val == NodeStartupState::partOfNetwork ||
2357 val == NodeStartupState::partOfPublicNetwork ||
2358 val == NodeStartupState::readingPrivateLedger;
2363 std::lock_guard<pal::Mutex> guard(lock);
2364 auto s = sm.
value();
2365 if (s == NodeStartupState::readingPrivateLedger)
2367 return {s, recovery_v, recovery_store->current_version()};
2370 return {s, std::nullopt, std::nullopt};
2375 std::lock_guard<pal::Mutex> guard(lock);
2376 sm.
expect(NodeStartupState::partOfNetwork);
2385 const auto service_status = InternalTablesAccess::get_service_status(tx);
2387 !service_status.has_value() ||
2388 service_status.value() != ServiceStatus::OPEN)
2390 LOG_FAIL_FMT(
"Cannot rekey ledger while the service is not open");
2398 share_manager.issue_recovery_shares(tx, new_ledger_secret);
2399 LedgerSecretsBroadcast::broadcast_new(
2400 InternalTablesAccess::get_trusted_nodes(tx),
2401 tx.
wo(network.secrets),
2402 std::move(new_ledger_secret));
2414 std::lock_guard<pal::Mutex> guard(lock);
2415 return startup_seqno;
2420 return rpcsessions->get_session_metrics();
2425 std::lock_guard<pal::Mutex> guard(lock);
2426 return self_signed_node_cert;
2431 if (history ==
nullptr)
2433 throw std::logic_error(
2434 "Attempting to access COSE signatures config before history has been "
2438 return history->get_cose_signatures_config();
2442 bool is_ip(
const std::string_view& hostname)
2450 const auto final_component =
2451 ccf::nonstd::split(ccf::nonstd::split(hostname,
".").back(),
":")
2453 if (final_component.empty())
2455 throw std::runtime_error(fmt::format(
2456 "{} has a trailing period, is not a valid hostname", hostname));
2459 return std::ranges::all_of(
2460 final_component, [](
char c) {
return c >=
'0' && c <=
'9'; });
2463 std::vector<ccf::crypto::SubjectAltName> get_subject_alternative_names()
2470 return ccf::crypto::sans_from_string_list(
2476 std::vector<ccf::crypto::SubjectAltName> sans;
2477 for (
const auto& [_, interface] : config.network.rpc_interfaces)
2479 auto host = split_net_address(interface.published_address).first;
2480 sans.push_back({
host, is_ip(
host)});
2485 void accept_node_tls_connections()
2489 rpcsessions->set_node_cert(
2490 self_signed_node_cert, node_sign_kp->private_key_pem());
2494 void accept_network_tls_connections()
2499 endorsed_node_cert.has_value(),
2500 "Node certificate should be endorsed before accepting endorsed "
2503 if (
auto cert_opt = endorsed_node_cert; cert_opt.has_value())
2505 const auto& endorsed_cert = cert_opt.value();
2506 rpcsessions->set_network_cert(
2507 endorsed_cert, node_sign_kp->private_key_pem());
2512 auto find_frontend(ActorsType actor)
2514 auto fe = rpc_map->find(actor);
2515 if (!fe.has_value())
2517 throw std::logic_error(
2518 fmt::format(
"Cannot find {} frontend", (
int)actor));
2523 void open_frontend(ActorsType actor)
2525 find_frontend(actor)->open();
2528 void open_user_frontend()
2530 open_frontend(ActorsType::users);
2533 bool is_member_frontend_open_unsafe()
2535 return find_frontend(ActorsType::members)->is_open();
2538 bool is_member_frontend_open()
override
2540 std::lock_guard<pal::Mutex> guard(lock);
2541 return is_member_frontend_open_unsafe();
2544 bool is_user_frontend_open()
override
2546 std::lock_guard<pal::Mutex> guard(lock);
2547 return find_frontend(ActorsType::users)->is_open();
2550 std::vector<uint8_t> serialize_create_request(
2551 View create_view,
bool create_consortium =
true)
2553 CreateNetworkNodeToNode::In create_params;
2557 if (create_consortium)
2559 create_params.genesis_info = config.
start;
2562 create_params.node_id = self;
2563 create_params.certificate_signing_request = node_sign_kp->create_csr(
2565 create_params.node_endorsed_certificate =
2566 ccf::crypto::create_endorsed_cert(
2567 create_params.certificate_signing_request,
2570 network.identity->priv_key,
2571 network.identity->cert);
2575 history->set_endorsed_certificate(
2576 create_params.node_endorsed_certificate);
2578 create_params.public_key = node_sign_kp->public_key_pem();
2579 create_params.service_cert = network.identity->cert;
2580 create_params.quote_info = quote_info;
2581 create_params.public_encryption_key = node_encrypt_kp->public_key_pem();
2582 create_params.measurement = node_measurement;
2583 create_params.snp_uvm_endorsements = snp_uvm_endorsements;
2584 create_params.snp_security_policy =
2587 create_params.node_info_network = config.
network;
2588 create_params.node_data = config.
node_data;
2590 create_params.create_txid = {create_view, last_recovered_signed_idx + 1};
2594 create_params.sealing_recovery_data = std::make_pair(
2595 sealing::get_snp_sealed_recovery_key(snp_tcb_version.value()),
2599 const auto body = nlohmann::json(create_params).dump();
2604 ccf::http::headers::CONTENT_TYPE,
2605 ccf::http::headervalues::contenttype::JSON);
2607 request.set_body(body);
2609 return request.build_request();
2612 bool extract_create_result(
const std::shared_ptr<RpcContext>& ctx)
2620 const auto status = ctx->get_response_status();
2621 const auto& raw_body = ctx->get_response_body();
2622 if (status != HTTP_STATUS_OK)
2625 "Create response is error: {} {}\n{}",
2628 std::string(raw_body.begin(), raw_body.end()));
2632 const auto body = nlohmann::json::parse(raw_body);
2633 if (!body.is_boolean())
2635 LOG_FAIL_FMT(
"Expected boolean body in create response");
2637 "Expected boolean body in create response: {}", body.dump());
2644 bool send_create_request(
const std::vector<uint8_t>& packed)
2646 auto node_session = std::make_shared<SessionContext>(
2647 InvalidSessionId, self_signed_node_cert.
raw());
2650 std::shared_ptr<ccf::RpcHandler> search =
2651 ::http::fetch_rpc_handler(ctx, this->rpc_map);
2653 search->process(ctx);
2655 return extract_create_result(ctx);
2658 void create_and_send_boot_request(
2659 View create_view,
bool create_consortium =
true)
2665 if (!this->send_create_request(
2666 this->serialize_create_request(create_view, create_consortium)))
2668 throw std::runtime_error(
2669 "Service creation request could not be committed");
2671 if (create_consortium)
2673 this->advance_part_of_network();
2677 this->advance_part_of_public_network();
2682 void begin_private_recovery()
2684 sm.
expect(NodeStartupState::partOfPublicNetwork);
2688 setup_private_recovery_store();
2690 reset_recovery_hook();
2691 setup_one_off_secret_hook();
2694 sm.
advance(NodeStartupState::readingPrivateLedger);
2695 last_recovered_idx = recovery_store->current_version();
2696 read_ledger_entries(
2697 last_recovered_idx + 1, last_recovered_idx + recovery_batch_size);
2700 void setup_basic_hooks()
2702 network.tables->set_map_hook(
2703 network.secrets.get_name(),
2708 if (!is_part_of_network())
2714 const auto& ledger_secrets_for_nodes = w;
2715 if (!ledger_secrets_for_nodes.has_value())
2717 throw std::logic_error(fmt::format(
2718 "Unexpected removal from {} table",
2719 network.secrets.get_name()));
2722 for (
const auto& [node_id, encrypted_ledger_secrets] :
2723 ledger_secrets_for_nodes.value())
2725 if (node_id != self)
2731 for (
const auto& encrypted_ledger_secret :
2732 encrypted_ledger_secrets)
2734 auto plain_ledger_secret = LedgerSecretsBroadcast::decrypt(
2735 node_encrypt_kp, encrypted_ledger_secret.encrypted_secret);
2740 auto ledger_secret = std::make_shared<LedgerSecret>(
2741 std::move(plain_ledger_secret), hook_version);
2742 network.ledger_secrets->set_secret(
2743 hook_version + 1, std::move(ledger_secret));
2750 network.tables->set_global_hook(
2751 network.secrets.get_name(),
2756 if (!is_part_of_public_network())
2761 const auto& ledger_secrets_for_nodes = w;
2762 if (!ledger_secrets_for_nodes.has_value())
2764 throw std::logic_error(fmt::format(
2765 "Unexpected removal from {} table", network.secrets.get_name()));
2768 for (
const auto& [node_id, encrypted_ledger_secrets] :
2769 ledger_secrets_for_nodes.value())
2771 if (node_id != self)
2778 for (
const auto& encrypted_ledger_secret : encrypted_ledger_secrets)
2783 if (!encrypted_ledger_secret.version.has_value())
2785 throw std::logic_error(fmt::format(
2786 "Commit hook at seqno {} for table {}: no version for "
2787 "encrypted ledger secret",
2789 network.secrets.get_name()));
2792 auto plain_ledger_secret = LedgerSecretsBroadcast::decrypt(
2793 node_encrypt_kp, encrypted_ledger_secret.encrypted_secret);
2795 restored_ledger_secrets.emplace(
2796 encrypted_ledger_secret.version.value(),
2797 std::make_shared<LedgerSecret>(
2798 std::move(plain_ledger_secret),
2799 encrypted_ledger_secret.previous_secret_stored_version));
2802 if (!restored_ledger_secrets.empty())
2806 network.ledger_secrets->restore_historical(
2807 std::move(restored_ledger_secrets));
2808 begin_private_recovery();
2814 "Found no ledger secrets for this node ({}) in global commit hook "
2817 network.secrets.get_name(),
2821 network.tables->set_global_hook(
2822 network.nodes.get_name(),
2825 std::vector<NodeId> retired_committed_nodes;
2826 for (
const auto& [node_id, node_info] : w)
2828 if (node_info.has_value() && node_info->retired_committed)
2830 retired_committed_nodes.push_back(node_id);
2834 hook_version, retired_committed_nodes);
2843 network.tables->set_map_hook(
2844 network.node_endorsed_certificates.get_name(),
2851 "[local] node_endorsed_certificates local hook at version {}, "
2855 for (
auto const& [node_id, endorsed_certificate] : w)
2857 if (node_id != self)
2860 "[local] Ignoring endorsed certificate for other node {}",
2865 if (!endorsed_certificate.has_value())
2868 "[local] Endorsed cert for self ({}) has been deleted", self);
2869 throw std::logic_error(fmt::format(
2870 "Could not find endorsed node certificate for {}", self));
2873 std::lock_guard<pal::Mutex> guard(lock);
2875 if (endorsed_node_cert.has_value())
2878 "[local] Previous endorsed node cert was:\n{}",
2879 endorsed_node_cert->str());
2882 endorsed_node_cert = endorsed_certificate.value();
2884 "[local] Under lock, setting endorsed node cert to:\n{}",
2885 endorsed_node_cert->str());
2886 history->set_endorsed_certificate(endorsed_node_cert.value());
2887 n2n_channels->set_endorsed_node_cert(endorsed_node_cert.value());
2893 network.tables->set_global_hook(
2894 network.node_endorsed_certificates.get_name(),
2900 "[global] node_endorsed_certificates global hook at version {}, "
2904 for (
auto const& [node_id, endorsed_certificate] : w)
2906 if (node_id != self)
2909 "[global] Ignoring endorsed certificate for other node {}",
2914 if (!endorsed_certificate.has_value())
2917 "[global] Endorsed cert for self ({}) has been deleted",
2919 throw std::logic_error(fmt::format(
2920 "Could not find endorsed node certificate for {}", self));
2923 std::lock_guard<pal::Mutex> guard(lock);
2925 LOG_INFO_FMT(
"[global] Accepting network connections");
2926 accept_network_tls_connections();
2928 if (is_member_frontend_open_unsafe())
2936 auto [valid_from, valid_to] =
2938 ->validity_period();
2940 "[global] Member frontend is open, so refreshing self-signed "
2943 "[global] Previously:\n{}", self_signed_node_cert.
str());
2944 self_signed_node_cert = create_self_signed_cert(
2953 accept_node_tls_connections();
2959 "[global] Self-signed node cert remains:\n{}",
2960 self_signed_node_cert.
str());
2964 open_frontend(ActorsType::members);
2968 network.tables->set_global_hook(
2969 network.service.get_name(),
2974 throw std::logic_error(
"Unexpected deletion in service value");
2980 auto current_pubk_pem =
2983 if (hook_pubk_pem != current_pubk_pem)
2986 "Ignoring historical service open at seqno {} for {}",
2993 "Executing global hook for service table at {}, to service "
2994 "status {}. Cert is:\n{}",
2999 network.identity->set_certificate(w->cert);
3000 if (w->status == ServiceStatus::OPEN)
3002 open_user_frontend();
3005 LOG_INFO_FMT(
"Service open at seqno {}", hook_version);
3017 std::lock_guard<pal::Mutex> guard(lock);
3018 return last_recovered_signed_idx;
3021 void setup_recovery_hook()
3023 network.tables->set_map_hook(
3024 network.encrypted_ledger_secrets.get_name(),
3030 auto encrypted_ledger_secret_info = w;
3031 if (!encrypted_ledger_secret_info.has_value())
3033 throw std::logic_error(fmt::format(
3034 "Unexpected removal from {} table",
3035 network.encrypted_ledger_secrets.get_name()));
3040 if (!encrypted_ledger_secret_info->next_version.has_value())
3042 encrypted_ledger_secret_info->next_version = version + 1;
3045 if (encrypted_ledger_secret_info->previous_ledger_secret
3049 "Recovering encrypted ledger secret valid at seqno {}",
3050 encrypted_ledger_secret_info->previous_ledger_secret->version);
3053 recovered_encrypted_ledger_secrets.emplace_back(
3054 std::move(encrypted_ledger_secret_info.value()));
3060 void reset_recovery_hook()
3062 network.tables->unset_map_hook(
3063 network.encrypted_ledger_secrets.get_name());
3066 void setup_n2n_channels(
3067 const std::optional<ccf::crypto::Pem>& endorsed_node_certificate_ =
3074 n2n_channels->initialize(
3075 self, network.identity->cert, node_sign_kp, endorsed_node_certificate_);
3078 void setup_cmd_forwarder()
3080 cmd_forwarder->initialize(self);
3083 void setup_history()
3087 throw std::logic_error(
"History already initialised");
3090 history = std::make_shared<MerkleTxHistory>(
3097 network.tables->set_history(history);
3100 void setup_encryptor()
3104 throw std::logic_error(
"Encryptor already initialised");
3107 encryptor = make_encryptor();
3108 network.tables->set_encryptor(encryptor);
3111 void setup_consensus(
3112 bool public_only =
false,
3113 const std::optional<ccf::crypto::Pem>& endorsed_node_certificate_ =
3116 setup_n2n_channels(endorsed_node_certificate_);
3117 setup_cmd_forwarder();
3119 auto shared_state = std::make_shared<aft::State>(self);
3121 auto node_client = std::make_shared<HTTPNodeClient>(
3122 rpc_map, node_sign_kp, self_signed_node_cert, endorsed_node_cert);
3127 std::make_unique<::consensus::LedgerEnclave>(writer_factory),
3134 network.tables->set_consensus(
consensus);
3135 network.tables->set_snapshotter(snapshotter);
3139 network.tables->set_map_hook(
3140 network.nodes.get_name(),
3144 return std::make_unique<ConfigurationChangeHook>(version, w);
3153 network.tables->set_map_hook(
3154 network.cose_signatures.get_name(),
3156 [s = this->snapshotter](
3159 assert(w.has_value());
3160 s->record_cose_signature(version, w.value());
3164 network.tables->set_map_hook(
3165 network.serialise_tree.get_name(),
3167 [s = this->snapshotter](
3170 assert(w.has_value());
3171 const auto& tree = w.value();
3172 s->record_serialised_tree(version, tree);
3176 network.tables->set_map_hook(
3177 network.snapshot_evidence.get_name(),
3179 [s = this->snapshotter](
3182 assert(w.has_value());
3183 auto snapshot_evidence = w.value();
3184 s->record_snapshot_evidence_idx(version, snapshot_evidence);
3188 network.tables->set_global_hook(
3189 network.snapshot_evidence.get_name(),
3199 auto snapshot_evidence = w.value();
3207 std::lock_guard<pal::Mutex> guard(lock);
3209 backup_snapshot_fetch_task !=
nullptr &&
3210 !backup_snapshot_fetch_task->is_cancelled())
3213 "Backup snapshot fetch already in progress, skipping");
3218 "Snapshot evidence detected on backup - scheduling "
3219 "snapshot fetch from primary (since seqno: {})",
3220 snapshot_evidence.version);
3221 backup_snapshot_fetch_task =
3222 std::make_shared<BackupSnapshotFetch>(
3223 config.
snapshots, snapshot_evidence.version,
this);
3231 network.tables->set_global_hook(
3232 Tables::SNAPSHOT_STATUS,
3234 [s = this->snapshotter](
3236 assert(w.has_value());
3237 s->record_snapshot_status(w.value());
3240 if (signature_cache !=
nullptr)
3242 signature_cache->register_hooks(*network.tables);
3245 setup_basic_hooks();
3248 void setup_snapshotter()
3252 throw std::logic_error(
"Snapshotter already initialised");
3260 "snapshots.min_tx_count is lower than 2 while "
3261 "snapshots.time_interval is set to {}. Time-based snapshots may "
3262 "continue to be generated without application writes due to the "
3263 "writes to snapshot evidence and its signature.",
3267 snapshotter = std::make_shared<Snapshotter>(
3278 ::consensus::ledger_get_range,
3288 ::consensus::ledger_truncate, to_host, idx, recovery_mode);
3294 n2n_channels->set_message_limit(message_limit);
3299 n2n_channels->set_idle_timeout(idle_timeout);
3309 return network.identity->cert;
3314 const ::http::URL& url,
3319 const std::vector<std::string>& ca_certs = {},
3320 const std::string& app_protocol =
"HTTP1",
3321 bool authenticate_as_node_client_certificate =
false)
override
3323 std::optional<ccf::crypto::Pem> client_cert = std::nullopt;
3324 std::optional<ccf::crypto::Pem> client_cert_key = std::nullopt;
3325 if (authenticate_as_node_client_certificate)
3328 endorsed_node_cert ? *endorsed_node_cert : self_signed_node_cert;
3329 client_cert_key = node_sign_kp->private_key_pem();
3332 auto ca = std::make_shared<::tls::CA>(ca_certs,
true);
3333 std::shared_ptr<::tls::Cert> ca_cert =
3334 std::make_shared<::tls::Cert>(ca, client_cert, client_cert_key);
3335 auto client = rpcsessions->create_client(ca_cert, app_protocol);
3341 http::HeaderMap&& headers,
3342 std::vector<uint8_t>&& data) {
3343 return callback(status, std::move(headers), std::move(data));
3345 client->send_request(std::move(req));
3350 snapshotter->write_snapshot(snapshot_buf, request_id);
3355 return network.tables;
3360 return writer_factory;
3365 return recovery_decision_protocol;
3370 if (!is_part_of_network())
3373 "Skipping shuffling of sealed shares during recovery as ledger "
3374 "secrets are not yet available");
3377 auto latest_ledger_secret = network.ledger_secrets->get_latest(tx);
3378 sealing::shuffle_sealed_shares(tx, latest_ledger_secret.second);