44 bool is_open_ =
false;
47 std::shared_ptr<AbstractForwarder> cmd_forwarder;
50 size_t sig_tx_interval = 5000;
51 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
52 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
54 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
57 void update_consensus()
78 if (endpoint ==
nullptr)
82 if (allowed_verbs.empty())
85 HTTP_STATUS_NOT_FOUND,
86 ccf::errors::ResourceNotFound,
87 fmt::format(
"Unknown path: {}.", ctx->get_method()));
91 std::vector<char const*> allowed_verb_strs;
92 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
93 for (
auto verb : allowed_verbs)
95 allowed_verb_strs.push_back(verb.c_str());
97 const std::string allow_header_value =
98 fmt::format(
"{}", fmt::join(allowed_verb_strs,
", "));
103 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
104 if (ctx->get_request_verb() == HTTP_OPTIONS)
106 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
111 HTTP_STATUS_METHOD_NOT_ALLOWED,
112 ccf::errors::UnsupportedHttpVerb,
114 "Allowed methods for '{}' are: {}.",
116 allow_header_value));
124 bool check_uri_allowed(
125 std::shared_ptr<ccf::RpcContextImpl> ctx,
128 auto interface_id = ctx->get_session_context()->interface_id;
131 if (!node_configuration_subsystem)
133 node_configuration_subsystem =
135 if (!node_configuration_subsystem)
137 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
142 auto& ncs = node_configuration_subsystem->get();
143 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
145 if (rit != ncs.rpc_interface_regexes.end())
148 for (
const auto& re : rit->second)
151 if (std::regex_match(endpoint->full_uri_path, m, re))
159 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
165 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
170 "Request for {} rejected because the interface is unsecured and "
171 "no accepted_endpoints have been configured.",
172 endpoint->full_uri_path);
173 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
187 std::optional<std::string> resolve_redirect_location(
188 const RedirectionResolverConfig& resolver,
192 switch (resolver.kind)
196 const auto role_it = resolver.target.find(
"role");
197 const bool seeking_primary =
198 role_it == resolver.target.end() || role_it.value() ==
"primary";
199 const bool seeking_backup =
200 !seeking_primary && role_it.value() ==
"backup";
201 if (!seeking_primary && !seeking_backup)
206 const auto interface_it = resolver.target.find(
"interface");
207 const auto target_interface =
208 (interface_it == resolver.target.end()) ?
210 interface_it.value().get<
std::string>();
212 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
216 const auto primary_id =
consensus->primary();
217 if (seeking_primary && primary_id.has_value())
219 target_node_its.push_back(
nodes.find(primary_id.value()));
221 else if (seeking_backup)
223 for (
auto it =
nodes.begin(); it !=
nodes.end(); ++it)
225 if (it->first != primary_id)
227 target_node_its.push_back(it);
232 if (target_node_its.empty())
237 const auto node_it = target_node_its[rand() % target_node_its.size()];
238 if (node_it !=
nodes.end())
240 const auto& interfaces = node_it->second.rpc_interfaces;
242 const auto target_interface_it = interfaces.find(target_interface);
243 if (target_interface_it != interfaces.end())
245 return target_interface_it->second.published_address;
257 return resolver.target[
"address"].get<std::string>();
267 std::shared_ptr<ccf::RpcContextImpl> ctx,
271 auto rs = endpoint->properties.redirection_strategy;
282 const bool is_primary =
289 const auto listen_interface =
290 ctx->get_session_context()->interface_id.value_or(
291 PRIMARY_RPC_INTERFACE);
292 const auto location =
293 resolve_redirect_location(resolver, tx, listen_interface);
294 if (location.has_value())
296 ctx->set_response_header(
297 http::headers::LOCATION,
299 "https://{}{}", location.value(), ctx->get_request_url()));
300 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
306 HTTP_STATUS_SERVICE_UNAVAILABLE,
307 ccf::errors::PrimaryNotFound,
308 "Request should be redirected to primary, but receiving node "
309 "does not know current primary address");
317 const bool is_backup =
324 const auto listen_interface =
325 ctx->get_session_context()->interface_id.value_or(
326 PRIMARY_RPC_INTERFACE);
327 const auto location =
328 resolve_redirect_location(resolver, tx, listen_interface);
329 if (location.has_value())
331 ctx->set_response_header(
332 http::headers::LOCATION,
334 "https://{}{}", location.value(), ctx->get_request_url()));
335 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
341 HTTP_STATUS_SERVICE_UNAVAILABLE,
342 ccf::errors::BackupNotFound,
343 "Request should be redirected to backup, but receiving node "
344 "does not know any current backup address");
358 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
361 if (!node_configuration_subsystem)
363 node_configuration_subsystem =
365 if (!node_configuration_subsystem)
367 LOG_FAIL_FMT(
"Unable to access NodeConfigurationSubsystem");
372 const auto& node_config_state = node_configuration_subsystem->get();
373 const auto& interfaces =
374 node_config_state.node_config.network.rpc_interfaces;
375 const auto interface_it = interfaces.find(incoming_interface);
376 if (interface_it == interfaces.end())
379 "Could not find startup config for interface {}", incoming_interface);
383 return interface_it->second.redirections;
386 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
390 auto current_view =
consensus->get_view();
391 auto session_ctx = ctx->get_session_context();
392 if (!session_ctx->active_view.has_value())
395 session_ctx->active_view = current_view;
397 else if (current_view != session_ctx->active_view.value())
399 auto msg = fmt::format(
400 "Potential loss of session consistency on session {}. Started "
401 "in view {}, now in view {}. Closing session.",
402 session_ctx->client_session_id,
403 session_ctx->active_view.value(),
408 HTTP_STATUS_INTERNAL_SERVER_ERROR,
409 ccf::errors::SessionConsistencyLost,
411 ctx->terminate_session =
true;
419 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
420 std::shared_ptr<ccf::RpcContextImpl> ctx,
424 if (endpoint->authn_policies.empty())
429 std::unique_ptr<AuthnIdentity> identity =
nullptr;
431 std::string auth_error_reason;
432 std::vector<ODataAuthErrorDetails> error_details;
433 for (
const auto& policy : endpoint->authn_policies)
435 identity = policy->authenticate(tx, ctx, auth_error_reason);
436 if (identity !=
nullptr)
443 error_details.emplace_back(ODataAuthErrorDetails{
444 policy->get_security_scheme_name(),
445 ccf::errors::InvalidAuthenticationInfo,
450 if (identity ==
nullptr)
453 endpoint->authn_policies.back()->set_unauthenticated_error(
454 ctx, std::move(auth_error_reason));
457 std::vector<nlohmann::json> json_details;
458 for (
auto& details : error_details)
460 json_details.push_back(details);
463 HTTP_STATUS_UNAUTHORIZED,
464 ccf::errors::InvalidAuthenticationInfo,
465 "Invalid authentication credentials.",
466 std::move(json_details));
472 std::chrono::milliseconds get_forwarding_timeout(
473 std::shared_ptr<ccf::RpcContextImpl> ctx)
const
475 auto r = std::chrono::milliseconds(3'000);
477 auto interface_id = ctx->get_session_context()->interface_id;
478 if (interface_id.has_value())
480 auto& ncs = node_configuration_subsystem->get();
481 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
482 if (rit != ncs.node_config.network.rpc_interfaces.end())
484 if (rit->second.forwarding_timeout_ms.has_value())
486 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
495 std::shared_ptr<ccf::RpcContextImpl> ctx,
503 HTTP_STATUS_NOT_IMPLEMENTED,
504 ccf::errors::NotImplemented,
505 "Request cannot be forwarded to primary on HTTP/2 interface.");
513 HTTP_STATUS_INTERNAL_SERVER_ERROR,
514 ccf::errors::InternalError,
515 "No consensus or forwarder to forward request.");
520 if (ctx->get_session_context()->is_forwarded)
525 HTTP_STATUS_SERVICE_UNAVAILABLE,
526 ccf::errors::RequestAlreadyForwarded,
527 "RPC was already forwarded.");
534 if (!check_session_consistency(ctx))
540 if (!primary_id.has_value())
543 HTTP_STATUS_SERVICE_UNAVAILABLE,
544 ccf::errors::InternalError,
545 "RPC could not be forwarded to unknown primary.");
550 if (!cmd_forwarder->forward_command(
553 ctx->get_session_context()->caller_cert,
554 get_forwarding_timeout(ctx)))
557 HTTP_STATUS_SERVICE_UNAVAILABLE,
558 ccf::errors::InternalError,
559 "Unable to establish channel to forward to primary.");
564 LOG_TRACE_FMT(
"RPC forwarded to primary {}", primary_id.value());
567 ctx->response_is_pending =
true;
571 ctx->get_session_context()->is_forwarding =
true;
576 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
581 const auto start_time = ccf::get_enclave_time();
583 process_command_inner(ctx, endpoint, attempts);
585 const auto end_time = ccf::get_enclave_time();
587 if (endpoint !=
nullptr)
589 endpoints::RequestCompletedEvent rce;
590 rce.method = endpoint->dispatch.verb.c_str();
591 rce.dispatch_path = endpoint->dispatch.uri_path;
592 rce.status = ctx->get_response_status();
597 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
598 end_time - start_time);
599 rce.attempts = attempts;
605 endpoints::DispatchFailedEvent dfe;
606 dfe.method = ctx->get_method();
607 dfe.status = ctx->get_response_status();
613 void process_command_inner(
614 std::shared_ptr<ccf::RpcContextImpl> ctx,
618 constexpr auto max_attempts = 30;
619 while (attempts < max_attempts)
628 HTTP_STATUS_SERVICE_UNAVAILABLE,
629 ccf::errors::TooManyPendingTransactions,
630 "Too many transactions pending commit on the service.");
642 ctx->reset_response();
648 HTTP_STATUS_NOT_FOUND,
649 ccf::errors::FrontendNotOpen,
650 "Frontend is not open.");
657 endpoint = find_endpoint(ctx, *tx_p);
658 if (endpoint ==
nullptr)
665 if (!check_uri_allowed(ctx, endpoint))
670 const auto listen_interface =
671 ctx->get_session_context()->interface_id.value_or(
672 PRIMARY_RPC_INTERFACE);
673 const auto redirections = get_redirections_config(listen_interface);
677 if (redirections.has_value())
679 if (check_redirect(*tx_p, ctx, endpoint, redirections.value()))
688 const bool forwardable = (
consensus !=
nullptr);
690 if (!is_primary && forwardable)
692 switch (endpoint->properties.forwarding_required)
701 if (ctx->get_session_context()->is_forwarding)
703 forward(ctx, *tx_p, endpoint);
711 forward(ctx, *tx_p, endpoint);
718 std::unique_ptr<AuthnIdentity> identity =
719 get_authenticated_identity(ctx, *tx_p, endpoint);
727 if (!endpoint->authn_policies.empty())
729 if (identity ==
nullptr)
735 args.caller = std::move(identity);
743 if (!check_session_consistency(ctx))
748 if (!ctx->should_apply_writes())
753 if (ctx->response_is_pending)
757 else if (args.owned_tx ==
nullptr)
760 "Bad endpoint: During execution of {} {}, returned a non-pending "
761 "response but stole ownership of Tx object",
762 ctx->get_request_verb().c_str(),
763 ctx->get_request_path());
765 ctx->clear_response_headers();
767 HTTP_STATUS_INTERNAL_SERVER_ERROR,
768 ccf::errors::InternalError,
769 "Illegal endpoint implementation");
782 if (tx_id.has_value() &&
consensus !=
nullptr)
791 endpoint, args, tx_id.value());
793 catch (
const std::exception& e)
796 ctx->clear_response_headers();
798 args, tx_id.value());
800 HTTP_STATUS_INTERNAL_SERVER_ERROR,
801 ccf::errors::InternalError,
803 "Failed to execute local commit handler func: {}",
809 ctx->clear_response_headers();
811 args, tx_id.value());
813 HTTP_STATUS_INTERNAL_SERVER_ERROR,
814 ccf::errors::InternalError,
815 "Failed to execute local commit handler func");
836 ctx->clear_response_headers();
838 HTTP_STATUS_SERVICE_UNAVAILABLE,
839 ccf::errors::TransactionReplicationFailed,
840 "Transaction failed to replicate.");
851 "Transaction execution conflicted with compaction: {}", e.
what());
854 catch (RpcException& e)
856 ctx->clear_response_headers();
857 ctx->set_error(std::move(e.error));
863 ctx->clear_response_headers();
865 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.
describe());
869 catch (
const nlohmann::json::exception& e)
871 ctx->clear_response_headers();
873 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
886 catch (
const std::exception& e)
888 ctx->clear_response_headers();
890 HTTP_STATUS_INTERNAL_SERVER_ERROR,
891 ccf::errors::InternalError,
898 ctx->clear_response_headers();
900 HTTP_STATUS_SERVICE_UNAVAILABLE,
901 ccf::errors::TransactionCommitAttemptsExceedLimit,
903 "Transaction continued to conflict after {} attempts. Retry "
907 static constexpr size_t retry_after_seconds = 3;
908 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
926 size_t sig_tx_interval_,
size_t sig_ms_interval_)
override
928 sig_tx_interval = sig_tx_interval_;
929 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
930 ms_to_sig = sig_ms_interval;
934 std::shared_ptr<AbstractForwarder> cmd_forwarder_)
override
936 cmd_forwarder = cmd_forwarder_;
941 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
952 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
968 const auto& [txid, root, term_of_next_version] =
984 void process(std::shared_ptr<ccf::RpcContextImpl> ctx)
override
990 process_command(ctx);
999 if (!ctx->get_session_context()->is_forwarded)
1001 throw std::logic_error(
1002 "Processing forwarded command with unitialised forwarded context");
1006 process_command(ctx);
1007 if (ctx->response_is_pending)
1011 throw std::logic_error(
"Forwarded RPC cannot be forwarded");
1015 void tick(std::chrono::milliseconds elapsed)
override