43 bool is_open_ =
false;
46 std::shared_ptr<AbstractForwarder> cmd_forwarder;
49 size_t sig_tx_interval = 5000;
50 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
51 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
53 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
56 void update_consensus()
77 if (endpoint ==
nullptr)
81 if (allowed_verbs.empty())
84 HTTP_STATUS_NOT_FOUND,
85 ccf::errors::ResourceNotFound,
86 fmt::format(
"Unknown path: {}.", ctx->get_method()));
90 std::vector<char const*> allowed_verb_strs;
91 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
92 for (
auto verb : allowed_verbs)
94 allowed_verb_strs.push_back(verb.c_str());
96 const std::string allow_header_value =
97 fmt::format(
"{}", fmt::join(allowed_verb_strs,
", "));
102 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
103 if (ctx->get_request_verb() == HTTP_OPTIONS)
105 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
110 HTTP_STATUS_METHOD_NOT_ALLOWED,
111 ccf::errors::UnsupportedHttpVerb,
113 "Allowed methods for '{}' are: {}.",
115 allow_header_value));
123 bool check_uri_allowed(
124 std::shared_ptr<ccf::RpcContextImpl> ctx,
127 auto interface_id = ctx->get_session_context()->interface_id;
128 if ((
consensus !=
nullptr) && interface_id)
130 if (!node_configuration_subsystem)
132 node_configuration_subsystem =
134 if (!node_configuration_subsystem)
136 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
141 const auto& ncs = node_configuration_subsystem->get();
143 const auto& required_features = endpoint->required_operator_features;
144 if (!required_features.empty())
148 const auto& interfaces = ncs.node_config.network.rpc_interfaces;
149 auto interface_it = interfaces.find(*interface_id);
150 if (interface_it == interfaces.end())
152 throw std::runtime_error(fmt::format(
153 "Could not find RPC interface named '{}' in startup config",
157 const auto& enabled_features =
158 interface_it->second.enabled_operator_features;
159 for (
const auto& required_feature : required_features)
162 enabled_features.find(required_feature) == enabled_features.end())
165 "Incoming request {} requires opt-in feature {}, which is not "
166 "enabled on interface {} where this request was received - "
168 endpoint->full_uri_path,
171 ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
177 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
179 if (rit != ncs.rpc_interface_regexes.end())
182 for (
const auto& re : rit->second)
185 if (std::regex_match(endpoint->full_uri_path, m, re))
193 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
199 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
201 icfg.endorsement.has_value() &&
206 "Request for {} rejected because the interface is unsecured and "
207 "no accepted_endpoints have been configured.",
208 endpoint->full_uri_path);
209 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
223 std::optional<std::string> resolve_redirect_location(
224 const RedirectionResolverConfig& resolver,
228 switch (resolver.kind)
232 const auto role_it = resolver.target.find(
"role");
233 const bool seeking_primary =
234 role_it == resolver.target.end() || role_it.value() ==
"primary";
235 const bool seeking_backup =
236 !seeking_primary && role_it.value() ==
"backup";
237 if (!seeking_primary && !seeking_backup)
242 const auto interface_it = resolver.target.find(
"interface");
243 const auto target_interface =
244 (interface_it == resolver.target.end()) ?
246 interface_it.value().get<
std::string>();
248 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
252 const auto primary_id =
consensus->primary();
253 if (seeking_primary && primary_id.has_value())
255 target_node_its.push_back(
nodes.find(primary_id.value()));
257 else if (seeking_backup)
259 for (
auto it =
nodes.begin(); it !=
nodes.end(); ++it)
261 if (it->first != primary_id)
263 target_node_its.push_back(it);
268 if (target_node_its.empty())
274 target_node_its[random() % target_node_its.size()];
275 if (node_it !=
nodes.end())
277 const auto& interfaces = node_it->second.rpc_interfaces;
279 const auto target_interface_it = interfaces.find(target_interface);
280 if (target_interface_it != interfaces.end())
282 return target_interface_it->second.published_address;
294 return resolver.target[
"address"].get<std::string>();
304 std::shared_ptr<ccf::RpcContextImpl> ctx,
308 auto rs = endpoint->properties.redirection_strategy;
319 const bool is_primary =
326 const auto listen_interface =
327 ctx->get_session_context()->interface_id.value_or(
328 PRIMARY_RPC_INTERFACE);
329 const auto location =
330 resolve_redirect_location(resolver, tx, listen_interface);
331 if (location.has_value())
333 ctx->set_response_header(
334 http::headers::LOCATION,
336 "https://{}{}", location.value(), ctx->get_request_url()));
337 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
343 HTTP_STATUS_SERVICE_UNAVAILABLE,
344 ccf::errors::PrimaryNotFound,
345 "Request should be redirected to primary, but receiving node "
346 "does not know current primary address");
354 const bool is_backup =
361 const auto listen_interface =
362 ctx->get_session_context()->interface_id.value_or(
363 PRIMARY_RPC_INTERFACE);
364 const auto location =
365 resolve_redirect_location(resolver, tx, listen_interface);
366 if (location.has_value())
368 ctx->set_response_header(
369 http::headers::LOCATION,
371 "https://{}{}", location.value(), ctx->get_request_url()));
372 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
378 HTTP_STATUS_SERVICE_UNAVAILABLE,
379 ccf::errors::BackupNotFound,
380 "Request should be redirected to backup, but receiving node "
381 "does not know any current backup address");
395 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
398 if (!node_configuration_subsystem)
400 node_configuration_subsystem =
402 if (!node_configuration_subsystem)
404 LOG_FAIL_FMT(
"Unable to access NodeConfigurationSubsystem");
409 const auto& node_config_state = node_configuration_subsystem->get();
410 const auto& interfaces =
411 node_config_state.node_config.network.rpc_interfaces;
412 const auto interface_it = interfaces.find(incoming_interface);
413 if (interface_it == interfaces.end())
416 "Could not find startup config for interface {}", incoming_interface);
420 return interface_it->second.redirections;
423 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
427 auto current_view =
consensus->get_view();
428 auto session_ctx = ctx->get_session_context();
429 if (!session_ctx->active_view.has_value())
432 session_ctx->active_view = current_view;
435 else if (current_view != *session_ctx->active_view)
437 auto msg = fmt::format(
438 "Potential loss of session consistency on session {}. Started "
439 "in view {}, now in view {}. Closing session.",
440 session_ctx->client_session_id,
447 HTTP_STATUS_INTERNAL_SERVER_ERROR,
448 ccf::errors::SessionConsistencyLost,
450 ctx->terminate_session =
true;
458 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
459 std::shared_ptr<ccf::RpcContextImpl> ctx,
463 if (endpoint->authn_policies.empty())
468 std::unique_ptr<AuthnIdentity> identity =
nullptr;
470 std::string auth_error_reason;
471 std::vector<ODataAuthErrorDetails> error_details;
472 for (
const auto& policy : endpoint->authn_policies)
474 identity = policy->authenticate(tx, ctx, auth_error_reason);
475 if (identity !=
nullptr)
480 error_details.emplace_back(ODataAuthErrorDetails{
481 policy->get_security_scheme_name(),
482 ccf::errors::InvalidAuthenticationInfo,
486 if (identity ==
nullptr)
489 endpoint->authn_policies.back()->set_unauthenticated_error(
490 ctx, std::move(auth_error_reason));
493 std::vector<nlohmann::json> json_details;
494 json_details.reserve(error_details.size());
495 for (
auto& details : error_details)
497 json_details.emplace_back(details);
500 HTTP_STATUS_UNAUTHORIZED,
501 ccf::errors::InvalidAuthenticationInfo,
502 "Invalid authentication credentials.",
509 [[nodiscard]] std::chrono::milliseconds get_forwarding_timeout(
510 std::shared_ptr<ccf::RpcContextImpl> ctx)
const
512 auto r = std::chrono::milliseconds(3'000);
514 auto interface_id = ctx->get_session_context()->interface_id;
515 if (interface_id.has_value())
517 const auto& ncs = node_configuration_subsystem->get();
518 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
519 if (rit != ncs.node_config.network.rpc_interfaces.end())
521 if (rit->second.forwarding_timeout_ms.has_value())
524 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
533 std::shared_ptr<ccf::RpcContextImpl> ctx,
541 HTTP_STATUS_NOT_IMPLEMENTED,
542 ccf::errors::NotImplemented,
543 "Request cannot be forwarded to primary on HTTP/2 interface.");
548 if (!cmd_forwarder || (
consensus ==
nullptr))
551 HTTP_STATUS_INTERNAL_SERVER_ERROR,
552 ccf::errors::InternalError,
553 "No consensus or forwarder to forward request.");
558 if (ctx->get_session_context()->is_forwarded)
563 HTTP_STATUS_SERVICE_UNAVAILABLE,
564 ccf::errors::RequestAlreadyForwarded,
565 "RPC was already forwarded.");
572 if (!check_session_consistency(ctx))
578 if (!primary_id.has_value())
581 HTTP_STATUS_SERVICE_UNAVAILABLE,
582 ccf::errors::InternalError,
583 "RPC could not be forwarded to unknown primary.");
588 if (!cmd_forwarder->forward_command(
591 ctx->get_session_context()->caller_cert,
592 get_forwarding_timeout(ctx)))
595 HTTP_STATUS_SERVICE_UNAVAILABLE,
596 ccf::errors::InternalError,
597 "Unable to establish channel to forward to primary.");
602 LOG_TRACE_FMT(
"RPC forwarded to primary {}", primary_id.value());
605 ctx->response_is_pending =
true;
609 ctx->get_session_context()->is_forwarding =
true;
612 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
617 const auto start_time = std::chrono::high_resolution_clock::now();
619 process_command_inner(ctx, endpoint, attempts);
621 const auto end_time = std::chrono::high_resolution_clock::now();
623 if (endpoint !=
nullptr)
625 endpoints::RequestCompletedEvent rce;
626 rce.method = endpoint->dispatch.verb.c_str();
627 rce.dispatch_path = endpoint->dispatch.uri_path;
628 rce.status = ctx->get_response_status();
633 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
634 end_time - start_time);
635 rce.attempts = attempts;
641 endpoints::DispatchFailedEvent dfe;
642 dfe.method = ctx->get_method();
643 dfe.status = ctx->get_response_status();
649 void process_command_inner(
650 std::shared_ptr<ccf::RpcContextImpl> ctx,
654 constexpr auto max_attempts = 30;
655 while (attempts < max_attempts)
664 HTTP_STATUS_SERVICE_UNAVAILABLE,
665 ccf::errors::TooManyPendingTransactions,
666 "Too many transactions pending commit on the service.");
678 ctx->reset_response();
684 HTTP_STATUS_NOT_FOUND,
685 ccf::errors::FrontendNotOpen,
686 "Frontend is not open.");
693 endpoint = find_endpoint(ctx, *tx_p);
694 if (endpoint ==
nullptr)
701 if (!check_uri_allowed(ctx, endpoint))
706 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
707 redirections = std::nullopt;
711 if (ctx->get_session_context()->interface_id.has_value())
713 redirections = get_redirections_config(
715 *ctx->get_session_context()->interface_id);
720 if (redirections.has_value())
722 if (check_redirect(*tx_p, ctx, endpoint, *redirections))
731 const bool forwardable = (
consensus !=
nullptr);
733 if (!is_primary && forwardable)
735 switch (endpoint->properties.forwarding_required)
744 if (ctx->get_session_context()->is_forwarding)
746 forward(ctx, *tx_p, endpoint);
754 forward(ctx, *tx_p, endpoint);
761 std::unique_ptr<AuthnIdentity> identity =
762 get_authenticated_identity(ctx, *tx_p, endpoint);
770 if (!endpoint->authn_policies.empty())
772 if (identity ==
nullptr)
776 args.caller = std::move(identity);
783 if (!check_session_consistency(ctx))
788 if (!ctx->should_apply_writes())
793 if (ctx->response_is_pending)
798 if (args.owned_tx ==
nullptr)
801 "Bad endpoint: During execution of {} {}, returned a non-pending "
802 "response but stole ownership of Tx object",
803 ctx->get_request_verb().c_str(),
804 ctx->get_request_path());
806 ctx->clear_response_headers();
808 HTTP_STATUS_INTERNAL_SERVER_ERROR,
809 ccf::errors::InternalError,
810 "Illegal endpoint implementation");
822 std::string captured_commit_evidence;
825 ctx->consensus_committed_func;
826 if (committed_func !=
nullptr)
828 ws_observer = [&captured_ws_digest, &captured_commit_evidence](
830 const std::string& ce) {
831 captured_ws_digest = ws_digest;
832 captured_commit_evidence = ce;
837 tx.
commit(ctx->claims,
nullptr, ws_observer);
844 if (tx_id_opt.has_value() &&
consensus !=
nullptr)
855 endpoint, args, tx_id);
857 catch (
const std::exception& e)
860 ctx->clear_response_headers();
863 HTTP_STATUS_INTERNAL_SERVER_ERROR,
864 ccf::errors::InternalError,
866 "Failed to execute local commit handler func: {}",
872 ctx->clear_response_headers();
875 HTTP_STATUS_INTERNAL_SERVER_ERROR,
876 ccf::errors::InternalError,
877 "Failed to execute local commit handler func");
881 if (committed_func !=
nullptr)
883 ctx->respond_on_commit =
887 std::move(captured_ws_digest),
888 std::move(captured_commit_evidence),
911 ctx->clear_response_headers();
913 HTTP_STATUS_SERVICE_UNAVAILABLE,
914 ccf::errors::TransactionReplicationFailed,
915 "Transaction failed to replicate.");
926 "Transaction execution conflicted with compaction: {}", e.
what());
929 catch (RpcException& e)
931 ctx->clear_response_headers();
932 ctx->set_error(std::move(e.error));
938 ctx->clear_response_headers();
940 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.
describe());
944 catch (
const nlohmann::json::exception& e)
946 ctx->clear_response_headers();
948 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
961 catch (
const std::exception& e)
963 ctx->clear_response_headers();
965 HTTP_STATUS_INTERNAL_SERVER_ERROR,
966 ccf::errors::InternalError,
973 ctx->clear_response_headers();
975 HTTP_STATUS_SERVICE_UNAVAILABLE,
976 ccf::errors::TransactionCommitAttemptsExceedLimit,
978 "Transaction continued to conflict after {} attempts. Retry "
982 static constexpr size_t retry_after_seconds = 3;
983 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
997 size_t sig_tx_interval_,
size_t sig_ms_interval_)
override
999 sig_tx_interval = sig_tx_interval_;
1000 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
1001 ms_to_sig = sig_ms_interval;
1005 std::shared_ptr<AbstractForwarder> cmd_forwarder_)
override
1007 cmd_forwarder = cmd_forwarder_;
1012 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
1023 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
1033 if (history !=
nullptr)
1039 const auto& [txid, root, term_of_next_version] =
1055 void process(std::shared_ptr<ccf::RpcContextImpl> ctx)
override
1061 process_command(ctx);
1070 if (!ctx->get_session_context()->is_forwarded)
1072 throw std::logic_error(
1073 "Processing forwarded command with unitialised forwarded context");
1077 process_command(ctx);
1078 if (ctx->response_is_pending)
1082 throw std::logic_error(
"Forwarded RPC cannot be forwarded");
1086 void tick(std::chrono::milliseconds elapsed)
override