CCF
Loading...
Searching...
No Matches
frontend.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/http_status.h"
7#include "ccf/node_context.h"
8#include "ccf/pal/locking.h"
9#include "ccf/rpc_exception.h"
16#include "enclave/rpc_handler.h"
17#include "forwarder.h"
18#include "http/http_jwt.h"
21#include "kv/store.h"
25
26#define FMT_HEADER_ONLY
27
28#include <fmt/format.h>
29#include <utility>
30#include <vector>
31
32namespace ccf
33{
35 {
36 protected:
40
41 private:
42 ccf::pal::Mutex open_lock;
43 bool is_open_ = false;
44
46 std::shared_ptr<AbstractForwarder> cmd_forwarder;
47 ccf::kv::TxHistory* history{nullptr};
48
49 size_t sig_tx_interval = 5000;
50 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
51 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
52
53 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
54 nullptr;
55
56 void update_consensus()
57 {
58 auto* c = tables.get_consensus().get();
59
60 if (consensus != c)
61 {
62 consensus = c;
64 }
65 }
66
67 void update_history()
68 {
69 history = tables.get_history().get();
70 endpoints.set_history(history);
71 }
72
74 std::shared_ptr<ccf::RpcContextImpl> ctx, ccf::kv::CommittableTx& tx)
75 {
76 const auto endpoint = endpoints.find_endpoint(tx, *ctx);
77 if (endpoint == nullptr)
78 {
79 // Every path from here should populate an appropriate response error
80 const auto allowed_verbs = endpoints.get_allowed_verbs(tx, *ctx);
81 if (allowed_verbs.empty())
82 {
83 ctx->set_error(
84 HTTP_STATUS_NOT_FOUND,
85 ccf::errors::ResourceNotFound,
86 fmt::format("Unknown path: {}.", ctx->get_method()));
87 }
88 else
89 {
90 std::vector<char const*> allowed_verb_strs;
91 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
92 for (auto verb : allowed_verbs)
93 {
94 allowed_verb_strs.push_back(verb.c_str());
95 }
96 const std::string allow_header_value =
97 fmt::format("{}", fmt::join(allowed_verb_strs, ", "));
98 // List allowed methods in 2 places:
99 // - ALLOW header for standards compliance + machine parsing
100 // - Body for visiblity + human readability (unless this was an
101 // OPTIONS request, which returns a 204 No Content)
102 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
103 if (ctx->get_request_verb() == HTTP_OPTIONS)
104 {
105 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
106 }
107 else
108 {
109 ctx->set_error(
110 HTTP_STATUS_METHOD_NOT_ALLOWED,
111 ccf::errors::UnsupportedHttpVerb,
112 fmt::format(
113 "Allowed methods for '{}' are: {}.",
114 ctx->get_method(),
115 allow_header_value));
116 }
117 }
118 }
119
120 return endpoint;
121 }
122
123 bool check_uri_allowed(
124 std::shared_ptr<ccf::RpcContextImpl> ctx,
125 const endpoints::EndpointDefinitionPtr& endpoint)
126 {
127 auto interface_id = ctx->get_session_context()->interface_id;
128 if ((consensus != nullptr) && interface_id)
129 {
130 if (!node_configuration_subsystem)
131 {
132 node_configuration_subsystem =
133 node_context.get_subsystem<NodeConfigurationSubsystem>();
134 if (!node_configuration_subsystem)
135 {
136 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
137 return false;
138 }
139 }
140
141 const auto& ncs = node_configuration_subsystem->get();
142
143 const auto& required_features = endpoint->required_operator_features;
144 if (!required_features.empty())
145 {
146 // Check that all required opt-in features are present on this
147 // interface's enabled features
148 const auto& interfaces = ncs.node_config.network.rpc_interfaces;
149 auto interface_it = interfaces.find(*interface_id);
150 if (interface_it == interfaces.end())
151 {
152 throw std::runtime_error(fmt::format(
153 "Could not find RPC interface named '{}' in startup config",
154 *interface_id));
155 }
156
157 const auto& enabled_features =
158 interface_it->second.enabled_operator_features;
159 for (const auto& required_feature : required_features)
160 {
161 if (
162 enabled_features.find(required_feature) == enabled_features.end())
163 {
165 "Incoming request {} requires opt-in feature {}, which is not "
166 "enabled on interface {} where this request was received - "
167 "returning error",
168 endpoint->full_uri_path,
169 required_feature,
170 *interface_id);
171 ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
172 return false;
173 }
174 }
175 }
176
177 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
178
179 if (rit != ncs.rpc_interface_regexes.end())
180 {
181 bool ok = false;
182 for (const auto& re : rit->second)
183 {
184 std::smatch m;
185 if (std::regex_match(endpoint->full_uri_path, m, re))
186 {
187 ok = true;
188 break;
189 }
190 }
191 if (!ok)
192 {
193 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
194 return false;
195 }
196 }
197 else
198 {
199 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
200 if (
201 icfg.endorsement.has_value() &&
202 icfg.endorsement->authority == Authority::UNSECURED)
203 {
204 // Unsecured interfaces are opt-in only.
206 "Request for {} rejected because the interface is unsecured and "
207 "no accepted_endpoints have been configured.",
208 endpoint->full_uri_path);
209 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
210 return false;
211 }
212 }
213 }
214 else
215 {
216 // internal or forwarded: OK because they have been checked by the
217 // forwarder (forward() happens further down).
218 }
219
220 return true;
221 }
222
223 std::optional<std::string> resolve_redirect_location(
224 const RedirectionResolverConfig& resolver,
226 const ccf::ListenInterfaceID& incoming_interface)
227 {
228 switch (resolver.kind)
229 {
231 {
232 const auto role_it = resolver.target.find("role");
233 const bool seeking_primary =
234 role_it == resolver.target.end() || role_it.value() == "primary";
235 const bool seeking_backup =
236 !seeking_primary && role_it.value() == "backup";
237 if (!seeking_primary && !seeking_backup)
238 {
239 return std::nullopt;
240 }
241
242 const auto interface_it = resolver.target.find("interface");
243 const auto target_interface =
244 (interface_it == resolver.target.end()) ?
245 incoming_interface :
246 interface_it.value().get<std::string>();
247
248 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
249 target_node_its;
251 {
252 const auto primary_id = consensus->primary();
253 if (seeking_primary && primary_id.has_value())
254 {
255 target_node_its.push_back(nodes.find(primary_id.value()));
256 }
257 else if (seeking_backup)
258 {
259 for (auto it = nodes.begin(); it != nodes.end(); ++it)
260 {
261 if (it->first != primary_id)
262 {
263 target_node_its.push_back(it);
264 }
265 }
266 }
267 }
268 if (target_node_its.empty())
269 {
270 return std::nullopt;
271 }
272
273 const auto node_it =
274 target_node_its[random() % target_node_its.size()];
275 if (node_it != nodes.end())
276 {
277 const auto& interfaces = node_it->second.rpc_interfaces;
278
279 const auto target_interface_it = interfaces.find(target_interface);
280 if (target_interface_it != interfaces.end())
281 {
282 return target_interface_it->second.published_address;
283 }
284 }
285 else
286 {
287 return std::nullopt;
288 }
289 break;
290 }
291
293 {
294 return resolver.target["address"].get<std::string>();
295 break;
296 }
297 }
298
299 return std::nullopt;
300 }
301
302 bool check_redirect(
304 std::shared_ptr<ccf::RpcContextImpl> ctx,
305 const endpoints::EndpointDefinitionPtr& endpoint,
307 {
308 auto rs = endpoint->properties.redirection_strategy;
309
310 switch (rs)
311 {
313 {
314 return false;
315 }
316
318 {
319 const bool is_primary =
320 (consensus != nullptr) && consensus->can_replicate();
321
322 if (!is_primary)
323 {
324 auto resolver = redirections.to_primary;
325
326 const auto listen_interface =
327 ctx->get_session_context()->interface_id.value_or(
328 PRIMARY_RPC_INTERFACE);
329 const auto location =
330 resolve_redirect_location(resolver, tx, listen_interface);
331 if (location.has_value())
332 {
333 ctx->set_response_header(
334 http::headers::LOCATION,
335 fmt::format(
336 "https://{}{}", location.value(), ctx->get_request_url()));
337 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
338 return true;
339 }
340
341 // Should have redirected, but don't know how to. Return an error
342 ctx->set_error(
343 HTTP_STATUS_SERVICE_UNAVAILABLE,
344 ccf::errors::PrimaryNotFound,
345 "Request should be redirected to primary, but receiving node "
346 "does not know current primary address");
347 return true;
348 }
349 return false;
350 }
351
353 {
354 const bool is_backup =
355 (consensus != nullptr) && !consensus->can_replicate();
356
357 if (!is_backup)
358 {
359 auto resolver = redirections.to_backup;
360
361 const auto listen_interface =
362 ctx->get_session_context()->interface_id.value_or(
363 PRIMARY_RPC_INTERFACE);
364 const auto location =
365 resolve_redirect_location(resolver, tx, listen_interface);
366 if (location.has_value())
367 {
368 ctx->set_response_header(
369 http::headers::LOCATION,
370 fmt::format(
371 "https://{}{}", location.value(), ctx->get_request_url()));
372 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
373 return true;
374 }
375
376 // Should have redirected, but don't know how to. Return an error
377 ctx->set_error(
378 HTTP_STATUS_SERVICE_UNAVAILABLE,
379 ccf::errors::BackupNotFound,
380 "Request should be redirected to backup, but receiving node "
381 "does not know any current backup address");
382 return true;
383 }
384 return false;
385 }
386
387 default:
388 {
389 LOG_FAIL_FMT("Unhandled redirection strategy: {}", rs);
390 return false;
391 }
392 }
393 }
394
395 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
396 get_redirections_config(const ccf::ListenInterfaceID& incoming_interface)
397 {
398 if (!node_configuration_subsystem)
399 {
400 node_configuration_subsystem =
401 node_context.get_subsystem<NodeConfigurationSubsystem>();
402 if (!node_configuration_subsystem)
403 {
404 LOG_FAIL_FMT("Unable to access NodeConfigurationSubsystem");
405 return std::nullopt;
406 }
407 }
408
409 const auto& node_config_state = node_configuration_subsystem->get();
410 const auto& interfaces =
411 node_config_state.node_config.network.rpc_interfaces;
412 const auto interface_it = interfaces.find(incoming_interface);
413 if (interface_it == interfaces.end())
414 {
416 "Could not find startup config for interface {}", incoming_interface);
417 return std::nullopt;
418 }
419
420 return interface_it->second.redirections;
421 }
422
423 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
424 {
425 if (consensus != nullptr)
426 {
427 auto current_view = consensus->get_view();
428 auto session_ctx = ctx->get_session_context();
429 if (!session_ctx->active_view.has_value())
430 {
431 // First request on this session - assign the active term
432 session_ctx->active_view = current_view;
433 }
434 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
435 else if (current_view != *session_ctx->active_view)
436 {
437 auto msg = fmt::format(
438 "Potential loss of session consistency on session {}. Started "
439 "in view {}, now in view {}. Closing session.",
440 session_ctx->client_session_id,
441 *session_ctx // NOLINT(bugprone-unchecked-optional-access)
442 ->active_view,
443 current_view);
444 LOG_INFO_FMT("{}", msg);
445
446 ctx->set_error(
447 HTTP_STATUS_INTERNAL_SERVER_ERROR,
448 ccf::errors::SessionConsistencyLost,
449 std::move(msg));
450 ctx->terminate_session = true;
451 return false;
452 }
453 }
454
455 return true;
456 }
457
458 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
459 std::shared_ptr<ccf::RpcContextImpl> ctx,
461 const endpoints::EndpointDefinitionPtr& endpoint)
462 {
463 if (endpoint->authn_policies.empty())
464 {
465 return nullptr;
466 }
467
468 std::unique_ptr<AuthnIdentity> identity = nullptr;
469
470 std::string auth_error_reason;
471 std::vector<ODataAuthErrorDetails> error_details;
472 for (const auto& policy : endpoint->authn_policies)
473 {
474 identity = policy->authenticate(tx, ctx, auth_error_reason);
475 if (identity != nullptr)
476 {
477 break;
478 }
479 // Collate error details
480 error_details.emplace_back(ODataAuthErrorDetails{
481 policy->get_security_scheme_name(),
482 ccf::errors::InvalidAuthenticationInfo,
483 auth_error_reason});
484 }
485
486 if (identity == nullptr)
487 {
488 // If none were accepted, let the last set the response header
489 endpoint->authn_policies.back()->set_unauthenticated_error(
490 ctx, std::move(auth_error_reason));
491 // Return collated error details for the auth policies
492 // declared in the request
493 std::vector<nlohmann::json> json_details;
494 json_details.reserve(error_details.size());
495 for (auto& details : error_details)
496 {
497 json_details.emplace_back(details);
498 }
499 ctx->set_error(
500 HTTP_STATUS_UNAUTHORIZED,
501 ccf::errors::InvalidAuthenticationInfo,
502 "Invalid authentication credentials.",
503 json_details);
504 }
505
506 return identity;
507 }
508
509 [[nodiscard]] std::chrono::milliseconds get_forwarding_timeout(
510 std::shared_ptr<ccf::RpcContextImpl> ctx) const
511 {
512 auto r = std::chrono::milliseconds(3'000);
513
514 auto interface_id = ctx->get_session_context()->interface_id;
515 if (interface_id.has_value())
516 {
517 const auto& ncs = node_configuration_subsystem->get();
518 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
519 if (rit != ncs.node_config.network.rpc_interfaces.end())
520 {
521 if (rit->second.forwarding_timeout_ms.has_value())
522 {
523 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
524 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
525 }
526 }
527 }
528
529 return r;
530 }
531
532 void forward(
533 std::shared_ptr<ccf::RpcContextImpl> ctx,
534 ccf::kv::ReadOnlyTx& /*tx*/,
535 const endpoints::EndpointDefinitionPtr& /*endpoint*/)
536 {
537 // HTTP/2 does not support forwarding
538 if (ctx->get_http_version() == HttpVersion::HTTP2)
539 {
540 ctx->set_error(
541 HTTP_STATUS_NOT_IMPLEMENTED,
542 ccf::errors::NotImplemented,
543 "Request cannot be forwarded to primary on HTTP/2 interface.");
544
545 return;
546 }
547
548 if (!cmd_forwarder || (consensus == nullptr))
549 {
550 ctx->set_error(
551 HTTP_STATUS_INTERNAL_SERVER_ERROR,
552 ccf::errors::InternalError,
553 "No consensus or forwarder to forward request.");
554
555 return;
556 }
557
558 if (ctx->get_session_context()->is_forwarded)
559 {
560 // If the request was already forwarded, return an error to prevent
561 // daisy chains.
562 ctx->set_error(
563 HTTP_STATUS_SERVICE_UNAVAILABLE,
564 ccf::errors::RequestAlreadyForwarded,
565 "RPC was already forwarded.");
566
567 return;
568 }
569
570 // Before attempting to forward, make sure we're in the same View as we
571 // previously thought we were.
572 if (!check_session_consistency(ctx))
573 {
574 return;
575 }
576
577 auto primary_id = consensus->primary();
578 if (!primary_id.has_value())
579 {
580 ctx->set_error(
581 HTTP_STATUS_SERVICE_UNAVAILABLE,
582 ccf::errors::InternalError,
583 "RPC could not be forwarded to unknown primary.");
584
585 return;
586 }
587
588 if (!cmd_forwarder->forward_command(
589 ctx,
590 primary_id.value(),
591 ctx->get_session_context()->caller_cert,
592 get_forwarding_timeout(ctx)))
593 {
594 ctx->set_error(
595 HTTP_STATUS_SERVICE_UNAVAILABLE,
596 ccf::errors::InternalError,
597 "Unable to establish channel to forward to primary.");
598
599 return;
600 }
601
602 LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value());
603
604 // Indicate that the RPC has been forwarded to primary
605 ctx->response_is_pending = true;
606
607 // Ensure future requests on this session are forwarded for session
608 // consistency
609 ctx->get_session_context()->is_forwarding = true;
610 }
611
612 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
613 {
614 size_t attempts = 0;
615 endpoints::EndpointDefinitionPtr endpoint = nullptr;
616
617 const auto start_time = std::chrono::high_resolution_clock::now();
618
619 process_command_inner(ctx, endpoint, attempts);
620
621 const auto end_time = std::chrono::high_resolution_clock::now();
622
623 if (endpoint != nullptr)
624 {
625 endpoints::RequestCompletedEvent rce;
626 rce.method = endpoint->dispatch.verb.c_str();
627 rce.dispatch_path = endpoint->dispatch.uri_path;
628 rce.status = ctx->get_response_status();
629 // Although enclave time returns a microsecond value, the actual
630 // precision/granularity depends on the host's TimeUpdater. By default
631 // this only advances each millisecond. Avoid implying more precision
632 // than that, by rounding to milliseconds
633 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
634 end_time - start_time);
635 rce.attempts = attempts;
636
638 }
639 else
640 {
641 endpoints::DispatchFailedEvent dfe;
642 dfe.method = ctx->get_method();
643 dfe.status = ctx->get_response_status();
644
646 }
647 }
648
649 void process_command_inner(
650 std::shared_ptr<ccf::RpcContextImpl> ctx,
652 size_t& attempts)
653 {
654 constexpr auto max_attempts = 30;
655 while (attempts < max_attempts)
656 {
657 if (consensus != nullptr)
658 {
659 if (
661 consensus->is_at_max_capacity())
662 {
663 ctx->set_error(
664 HTTP_STATUS_SERVICE_UNAVAILABLE,
665 ccf::errors::TooManyPendingTransactions,
666 "Too many transactions pending commit on the service.");
667 return;
668 }
669 }
670
671 std::unique_ptr<ccf::kv::CommittableTx> tx_p = tables.create_tx_ptr();
672 set_root_on_proposals(*ctx, *tx_p);
673
674 if (attempts > 0)
675 {
676 // If the endpoint has already been executed, the effects of its
677 // execution should be dropped
678 ctx->reset_response();
679 }
680
681 if (!is_open())
682 {
683 ctx->set_error(
684 HTTP_STATUS_NOT_FOUND,
685 ccf::errors::FrontendNotOpen,
686 "Frontend is not open.");
687 return;
688 }
689
690 ++attempts;
691 update_history();
692
693 endpoint = find_endpoint(ctx, *tx_p);
694 if (endpoint == nullptr)
695 {
696 return;
697 }
698
699 try
700 {
701 if (!check_uri_allowed(ctx, endpoint))
702 {
703 return;
704 }
705
706 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
707 redirections = std::nullopt;
708
709 // If there's no interface ID, this is already forwarded or otherwise
710 // special - don't try to redirect it
711 if (ctx->get_session_context()->interface_id.has_value())
712 {
713 redirections = get_redirections_config(
714 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
715 *ctx->get_session_context()->interface_id);
716 }
717
718 // If a redirections config was specified, then redirections are used
719 // and no forwarding is done
720 if (redirections.has_value())
721 {
722 if (check_redirect(*tx_p, ctx, endpoint, *redirections))
723 {
724 return;
725 }
726 }
727 else
728 {
729 bool is_primary =
730 (consensus == nullptr) || consensus->can_replicate();
731 const bool forwardable = (consensus != nullptr);
732
733 if (!is_primary && forwardable)
734 {
735 switch (endpoint->properties.forwarding_required)
736 {
738 {
739 break;
740 }
741
743 {
744 if (ctx->get_session_context()->is_forwarding)
745 {
746 forward(ctx, *tx_p, endpoint);
747 return;
748 }
749 break;
750 }
751
753 {
754 forward(ctx, *tx_p, endpoint);
755 return;
756 }
757 }
758 }
759 }
760
761 std::unique_ptr<AuthnIdentity> identity =
762 get_authenticated_identity(ctx, *tx_p, endpoint);
763
764 auto args = ccf::EndpointContextImpl(ctx, std::move(tx_p));
765 // NB: tx_p is no longer valid, and must be accessed from args, which
766 // may change it!
767
768 // If any auth policy was required, check that at least one is
769 // accepted
770 if (!endpoint->authn_policies.empty())
771 {
772 if (identity == nullptr)
773 {
774 return;
775 }
776 args.caller = std::move(identity);
777 }
778
779 endpoints.execute_endpoint(endpoint, args);
780
781 // If we've seen a View change, abandon this transaction as
782 // inconsistent
783 if (!check_session_consistency(ctx))
784 {
785 return;
786 }
787
788 if (!ctx->should_apply_writes())
789 {
790 return;
791 }
792
793 if (ctx->response_is_pending)
794 {
795 return;
796 }
797
798 if (args.owned_tx == nullptr)
799 {
801 "Bad endpoint: During execution of {} {}, returned a non-pending "
802 "response but stole ownership of Tx object",
803 ctx->get_request_verb().c_str(),
804 ctx->get_request_path());
805
806 ctx->clear_response_headers();
807 ctx->set_error(
808 HTTP_STATUS_INTERNAL_SERVER_ERROR,
809 ccf::errors::InternalError,
810 "Illegal endpoint implementation");
811 return;
812 }
813 // else args owns a valid Tx relating to a non-pending response, which
814 // should be applied
815 ccf::kv::CommittableTx& tx = *args.owned_tx;
816
817 // Only capture write set digest and commit evidence if the
818 // handler has set a consensus committed callback that may need
819 // them for receipt construction. Avoids unnecessary hashing
820 // on the common path.
821 ccf::crypto::Sha256Hash captured_ws_digest;
822 std::string captured_commit_evidence;
823 ccf::kv::CommittableTx::WriteSetObserver ws_observer = nullptr;
825 ctx->consensus_committed_func;
826 if (committed_func != nullptr)
827 {
828 ws_observer = [&captured_ws_digest, &captured_commit_evidence](
829 const ccf::crypto::Sha256Hash& ws_digest,
830 const std::string& ce) {
831 captured_ws_digest = ws_digest;
832 captured_commit_evidence = ce;
833 };
834 }
835
836 ccf::kv::CommitResult result =
837 tx.commit(ctx->claims, nullptr, ws_observer);
838
839 switch (result)
840 {
842 {
843 auto tx_id_opt = tx.get_txid();
844 if (tx_id_opt.has_value() && consensus != nullptr)
845 {
846 ccf::TxID tx_id = tx_id_opt.value();
847
848 try
849 {
850 // Only transactions that acquired one or more map handles
851 // have a TxID, while others (e.g. unauthenticated commands)
852 // don't. Also, only report a TxID if the consensus is set, as
853 // the consensus is required to verify that a TxID is valid.
855 endpoint, args, tx_id);
856 }
857 catch (const std::exception& e)
858 {
859 // run default handler to set transaction id in header
860 ctx->clear_response_headers();
862 ctx->set_error(
863 HTTP_STATUS_INTERNAL_SERVER_ERROR,
864 ccf::errors::InternalError,
865 fmt::format(
866 "Failed to execute local commit handler func: {}",
867 e.what()));
868 }
869 catch (...)
870 {
871 // run default handler to set transaction id in header
872 ctx->clear_response_headers();
874 ctx->set_error(
875 HTTP_STATUS_INTERNAL_SERVER_ERROR,
876 ccf::errors::InternalError,
877 "Failed to execute local commit handler func");
878 }
879
880 {
881 if (committed_func != nullptr)
882 {
883 ctx->respond_on_commit =
885 tx_id,
886 committed_func,
887 std::move(captured_ws_digest),
888 std::move(captured_commit_evidence),
889 ctx->claims};
890 }
891 }
892 }
893
894 if (
895 consensus != nullptr && consensus->can_replicate() &&
896 history != nullptr)
897 {
898 history->try_emit_signature();
899 }
900
901 return;
902 }
903
905 {
906 break;
907 }
908
910 {
911 ctx->clear_response_headers();
912 ctx->set_error(
913 HTTP_STATUS_SERVICE_UNAVAILABLE,
914 ccf::errors::TransactionReplicationFailed,
915 "Transaction failed to replicate.");
916
917 return;
918 }
919 }
920 }
921 catch (const ccf::kv::CompactedVersionConflict& e)
922 {
923 // The executing transaction failed because of a conflicting
924 // compaction. Reset and retry
926 "Transaction execution conflicted with compaction: {}", e.what());
927 continue;
928 }
929 catch (RpcException& e)
930 {
931 ctx->clear_response_headers();
932 ctx->set_error(std::move(e.error));
933
934 return;
935 }
936 catch (const ccf::JsonParseError& e)
937 {
938 ctx->clear_response_headers();
939 ctx->set_error(
940 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.describe());
941
942 return;
943 }
944 catch (const nlohmann::json::exception& e)
945 {
946 ctx->clear_response_headers();
947 ctx->set_error(
948 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
949
950 return;
951 }
952 catch (const ccf::kv::KvSerialiserException& e)
953 {
954 // If serialising the committed transaction fails, there is no way
955 // to recover safely (https://github.com/microsoft/CCF/issues/338).
956 // Better to abort.
957 LOG_DEBUG_FMT("Failed to serialise: {}", e.what());
958 LOG_FATAL_FMT("Failed to serialise");
959 abort();
960 }
961 catch (const std::exception& e)
962 {
963 ctx->clear_response_headers();
964 ctx->set_error(
965 HTTP_STATUS_INTERNAL_SERVER_ERROR,
966 ccf::errors::InternalError,
967 e.what());
968
969 return;
970 }
971 } // end of while loop
972
973 ctx->clear_response_headers();
974 ctx->set_error(
975 HTTP_STATUS_SERVICE_UNAVAILABLE,
976 ccf::errors::TransactionCommitAttemptsExceedLimit,
977 fmt::format(
978 "Transaction continued to conflict after {} attempts. Retry "
979 "later.",
980 max_attempts));
981
982 static constexpr size_t retry_after_seconds = 3;
983 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
984 }
985
986 public:
988 ccf::kv::Store& tables_,
990 ccf::AbstractNodeContext& node_context_) :
991 tables(tables_),
992 endpoints(handlers_),
993 node_context(node_context_)
994 {}
995
997 size_t sig_tx_interval_, size_t sig_ms_interval_) override
998 {
999 sig_tx_interval = sig_tx_interval_;
1000 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
1001 ms_to_sig = sig_ms_interval;
1002 }
1003
1005 std::shared_ptr<AbstractForwarder> cmd_forwarder_) override
1006 {
1007 cmd_forwarder = cmd_forwarder_;
1008 }
1009
1010 void open() override
1011 {
1012 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
1013 if (!is_open_)
1014 {
1015 LOG_INFO_FMT("Opening frontend");
1016 is_open_ = true;
1018 }
1019 }
1020
1021 bool is_open() override
1022 {
1023 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
1024 return is_open_;
1025 }
1026
1029 {
1031 {
1032 update_history();
1033 if (history != nullptr)
1034 {
1035 // Warning: Retrieving the current TxID and root from the history
1036 // should only ever be used for the proposal creation endpoint and
1037 // nothing else. Many bad things could happen otherwise (e.g. breaking
1038 // session consistency).
1039 const auto& [txid, root, term_of_next_version] =
1041 tx.set_read_txid(txid, term_of_next_version);
1042 tx.set_root_at_read_version(root);
1043 }
1044 }
1045 }
1046
1055 void process(std::shared_ptr<ccf::RpcContextImpl> ctx) override
1056 {
1057 update_consensus();
1058
1059 // NB: If we want to re-execute on backups, the original command could
1060 // be propagated from here
1061 process_command(ctx);
1062 }
1063
1068 void process_forwarded(std::shared_ptr<ccf::RpcContextImpl> ctx) override
1069 {
1070 if (!ctx->get_session_context()->is_forwarded)
1071 {
1072 throw std::logic_error(
1073 "Processing forwarded command with unitialised forwarded context");
1074 }
1075
1076 update_consensus();
1077 process_command(ctx);
1078 if (ctx->response_is_pending)
1079 {
1080 // This should never be called when process_command is called with a
1081 // forwarded RPC context
1082 throw std::logic_error("Forwarded RPC cannot be forwarded");
1083 }
1084 }
1085
1086 void tick(std::chrono::milliseconds elapsed) override
1087 {
1088 update_consensus();
1089
1090 endpoints.tick(elapsed);
1091 }
1092 };
1093}
Definition forwarder.h:18
static std::map< NodeId, NodeInfo > get_trusted_nodes(ccf::kv::ReadOnlyTx &tx)
Definition internal_tables_access.h:441
Definition json.h:26
std::string describe() const
Definition json.h:41
Definition rpc_context_impl.h:22
Definition frontend.h:35
ccf::kv::Store & tables
Definition frontend.h:37
RpcFrontend(ccf::kv::Store &tables_, endpoints::EndpointRegistry &handlers_, ccf::AbstractNodeContext &node_context_)
Definition frontend.h:987
void set_root_on_proposals(const ccf::RpcContextImpl &ctx, ccf::kv::CommittableTx &tx)
Definition frontend.h:1027
void set_cmd_forwarder(std::shared_ptr< AbstractForwarder > cmd_forwarder_) override
Definition frontend.h:1004
bool is_open() override
Definition frontend.h:1021
void process(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:1055
ccf::AbstractNodeContext & node_context
Definition frontend.h:39
void set_sig_intervals(size_t sig_tx_interval_, size_t sig_ms_interval_) override
Definition frontend.h:996
void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:1068
void open() override
Definition frontend.h:1010
endpoints::EndpointRegistry & endpoints
Definition frontend.h:38
void tick(std::chrono::milliseconds elapsed) override
Definition frontend.h:1086
Definition rpc_handler.h:24
Definition sha256_hash.h:16
Definition endpoint_registry.h:128
virtual void execute_endpoint_locally_committed(EndpointDefinitionPtr e, CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:567
virtual void handle_event_request_completed(const ccf::endpoints::RequestCompletedEvent &event)
Definition endpoint_registry.h:277
virtual void tick(std::chrono::milliseconds duration)
Definition endpoint_registry.cpp:641
virtual void init_handlers()
Definition endpoint_registry.cpp:471
void set_consensus(ccf::kv::Consensus *c)
Definition endpoint_registry.cpp:643
virtual void execute_endpoint(EndpointDefinitionPtr e, EndpointContext &ctx)
Definition endpoint_registry.cpp:553
virtual EndpointDefinitionPtr find_endpoint(ccf::kv::Tx &tx, ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:473
virtual bool request_needs_root(const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:614
virtual void handle_event_dispatch_failed(const ccf::endpoints::DispatchFailedEvent &event)
Definition endpoint_registry.h:281
void set_history(ccf::kv::TxHistory *h)
Definition endpoint_registry.cpp:648
virtual bool apply_uncommitted_tx_backpressure() const
Definition endpoint_registry.h:285
virtual std::set< RESTVerb > get_allowed_verbs(ccf::kv::Tx &tx, const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:582
Definition committable_tx.h:19
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:331
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, WriteSetObserver write_set_observer=nullptr)
Definition committable_tx.h:135
std::function< void(const ccf::crypto::Sha256Hash &write_set_digest, const std::string &commit_evidence)> WriteSetObserver
Definition committable_tx.h:120
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:341
std::optional< TxID > get_txid() const
Definition committable_tx.h:306
Definition compacted_version_conflict.h:10
char const * what() const
Definition compacted_version_conflict.h:17
Definition kv_types.h:367
Definition kv_types.h:318
const char * what() const noexcept override
Definition kv_types.h:325
Definition tx.h:159
Definition store.h:88
std::shared_ptr< TxHistory > get_history() override
Definition store.h:196
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1316
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:183
Definition kv_types.h:332
virtual void try_emit_signature()=0
virtual std::tuple< ccf::TxID, ccf::crypto::Sha256Hash, ccf::kv::Term > get_replicated_state_txid_and_root()=0
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FATAL_FMT
Definition internal_logger.h:17
#define LOG_FAIL_FMT
Definition internal_logger.h:16
void default_locally_committed_func(CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:202
std::function< void(CommittedTxInfo &info)> ConsensusCommittedEndpointFunction
Definition endpoint_context.h:82
std::shared_ptr< const EndpointDefinition > EndpointDefinitionPtr
Definition endpoint.h:240
CommitResult
Definition kv_types.h:212
@ FAIL_NO_REPLICATE
Definition kv_types.h:215
@ SUCCESS
Definition kv_types.h:213
@ FAIL_CONFLICT
Definition kv_types.h:214
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:13
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition consensus_types.h:23
STL namespace.
Definition node_context.h:12
std::shared_ptr< T > get_subsystem() const
Definition node_context.h:60
Definition endpoint_context_impl.h:13
Definition node_info_network.h:119
RedirectionResolverConfig to_primary
Definition node_info_network.h:120
RedirectionResolverConfig to_backup
Definition node_info_network.h:121
Definition rpc_context_impl.h:123
Definition tx_id.h:44