CCF
Loading...
Searching...
No Matches
frontend.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/http_status.h"
7#include "ccf/node_context.h"
8#include "ccf/pal/locking.h"
17#include "enclave/rpc_handler.h"
18#include "forwarder.h"
19#include "http/http_jwt.h"
21#include "kv/store.h"
24#include "rpc_exception.h"
26
27#define FMT_HEADER_ONLY
28
29#include <fmt/format.h>
30#include <utility>
31#include <vector>
32
33namespace ccf
34{
36 {
37 protected:
41
42 private:
43 ccf::pal::Mutex open_lock;
44 bool is_open_ = false;
45
47 std::shared_ptr<AbstractForwarder> cmd_forwarder;
48 ccf::kv::TxHistory* history;
49
50 size_t sig_tx_interval = 5000;
51 std::chrono::milliseconds sig_ms_interval = std::chrono::milliseconds(1000);
52 std::chrono::milliseconds ms_to_sig = std::chrono::milliseconds(1000);
53
54 std::shared_ptr<NodeConfigurationSubsystem> node_configuration_subsystem =
55 nullptr;
56
57 void update_consensus()
58 {
59 auto c = tables.get_consensus().get();
60
61 if (consensus != c)
62 {
63 consensus = c;
65 }
66 }
67
68 void update_history()
69 {
70 history = tables.get_history().get();
71 endpoints.set_history(history);
72 }
73
75 std::shared_ptr<ccf::RpcContextImpl> ctx, ccf::kv::CommittableTx& tx)
76 {
77 const auto endpoint = endpoints.find_endpoint(tx, *ctx);
78 if (endpoint == nullptr)
79 {
80 // Every path from here should populate an appropriate response error
81 const auto allowed_verbs = endpoints.get_allowed_verbs(tx, *ctx);
82 if (allowed_verbs.empty())
83 {
84 ctx->set_error(
85 HTTP_STATUS_NOT_FOUND,
86 ccf::errors::ResourceNotFound,
87 fmt::format("Unknown path: {}.", ctx->get_method()));
88 }
89 else
90 {
91 std::vector<char const*> allowed_verb_strs;
92 allowed_verb_strs.push_back(llhttp_method_name(HTTP_OPTIONS));
93 for (auto verb : allowed_verbs)
94 {
95 allowed_verb_strs.push_back(verb.c_str());
96 }
97 const std::string allow_header_value =
98 fmt::format("{}", fmt::join(allowed_verb_strs, ", "));
99 // List allowed methods in 2 places:
100 // - ALLOW header for standards compliance + machine parsing
101 // - Body for visiblity + human readability (unless this was an
102 // OPTIONS request, which returns a 204 No Content)
103 ctx->set_response_header(http::headers::ALLOW, allow_header_value);
104 if (ctx->get_request_verb() == HTTP_OPTIONS)
105 {
106 ctx->set_response_status(HTTP_STATUS_NO_CONTENT);
107 }
108 else
109 {
110 ctx->set_error(
111 HTTP_STATUS_METHOD_NOT_ALLOWED,
112 ccf::errors::UnsupportedHttpVerb,
113 fmt::format(
114 "Allowed methods for '{}' are: {}.",
115 ctx->get_method(),
116 allow_header_value));
117 }
118 }
119 }
120
121 return endpoint;
122 }
123
124 bool check_uri_allowed(
125 std::shared_ptr<ccf::RpcContextImpl> ctx,
126 const endpoints::EndpointDefinitionPtr& endpoint)
127 {
128 auto interface_id = ctx->get_session_context()->interface_id;
129 if (consensus && interface_id)
130 {
131 if (!node_configuration_subsystem)
132 {
133 node_configuration_subsystem =
134 node_context.get_subsystem<NodeConfigurationSubsystem>();
135 if (!node_configuration_subsystem)
136 {
137 ctx->set_response_status(HTTP_STATUS_INTERNAL_SERVER_ERROR);
138 return false;
139 }
140 }
141
142 auto& ncs = node_configuration_subsystem->get();
143 auto rit = ncs.rpc_interface_regexes.find(*interface_id);
144
145 if (rit != ncs.rpc_interface_regexes.end())
146 {
147 bool ok = false;
148 for (const auto& re : rit->second)
149 {
150 std::smatch m;
151 if (std::regex_match(endpoint->full_uri_path, m, re))
152 {
153 ok = true;
154 break;
155 }
156 }
157 if (!ok)
158 {
159 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
160 return false;
161 }
162 }
163 else
164 {
165 auto icfg = ncs.node_config.network.rpc_interfaces.at(*interface_id);
166 if (icfg.endorsement->authority == Authority::UNSECURED)
167 {
168 // Unsecured interfaces are opt-in only.
170 "Request for {} rejected because the interface is unsecured and "
171 "no accepted_endpoints have been configured.",
172 endpoint->full_uri_path);
173 ctx->set_response_status(HTTP_STATUS_SERVICE_UNAVAILABLE);
174 return false;
175 }
176 }
177 }
178 else
179 {
180 // internal or forwarded: OK because they have been checked by the
181 // forwarder (forward() happens further down).
182 }
183
184 return true;
185 }
186
187 std::optional<std::string> resolve_redirect_location(
188 const RedirectionResolverConfig& resolver,
190 const ccf::ListenInterfaceID& incoming_interface)
191 {
192 switch (resolver.kind)
193 {
195 {
196 const auto role_it = resolver.target.find("role");
197 const bool seeking_primary =
198 role_it == resolver.target.end() || role_it.value() == "primary";
199 const bool seeking_backup =
200 !seeking_primary && role_it.value() == "backup";
201 if (!seeking_primary && !seeking_backup)
202 {
203 return std::nullopt;
204 }
205
206 const auto interface_it = resolver.target.find("interface");
207 const auto target_interface =
208 (interface_it == resolver.target.end()) ?
209 incoming_interface :
210 interface_it.value().get<std::string>();
211
212 std::vector<std::map<NodeId, NodeInfo>::const_iterator>
213 target_node_its;
215 {
216 const auto primary_id = consensus->primary();
217 if (seeking_primary && primary_id.has_value())
218 {
219 target_node_its.push_back(nodes.find(primary_id.value()));
220 }
221 else if (seeking_backup)
222 {
223 for (auto it = nodes.begin(); it != nodes.end(); ++it)
224 {
225 if (it->first != primary_id)
226 {
227 target_node_its.push_back(it);
228 }
229 }
230 }
231 }
232 if (target_node_its.empty())
233 {
234 return std::nullopt;
235 }
236
237 const auto node_it = target_node_its[rand() % target_node_its.size()];
238 if (node_it != nodes.end())
239 {
240 const auto& interfaces = node_it->second.rpc_interfaces;
241
242 const auto target_interface_it = interfaces.find(target_interface);
243 if (target_interface_it != interfaces.end())
244 {
245 return target_interface_it->second.published_address;
246 }
247 }
248 else
249 {
250 return std::nullopt;
251 }
252 break;
253 }
254
256 {
257 return resolver.target["address"].get<std::string>();
258 break;
259 }
260 }
261
262 return std::nullopt;
263 }
264
265 bool check_redirect(
267 std::shared_ptr<ccf::RpcContextImpl> ctx,
268 const endpoints::EndpointDefinitionPtr& endpoint,
270 {
271 auto rs = endpoint->properties.redirection_strategy;
272
273 switch (rs)
274 {
276 {
277 return false;
278 }
279
281 {
282 const bool is_primary =
283 (consensus != nullptr) && consensus->can_replicate();
284
285 if (!is_primary)
286 {
287 auto resolver = redirections.to_primary;
288
289 const auto listen_interface =
290 ctx->get_session_context()->interface_id.value_or(
291 PRIMARY_RPC_INTERFACE);
292 const auto location =
293 resolve_redirect_location(resolver, tx, listen_interface);
294 if (location.has_value())
295 {
296 ctx->set_response_header(
297 http::headers::LOCATION,
298 fmt::format(
299 "https://{}{}", location.value(), ctx->get_request_url()));
300 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
301 return true;
302 }
303
304 // Should have redirected, but don't know how to. Return an error
305 ctx->set_error(
306 HTTP_STATUS_SERVICE_UNAVAILABLE,
307 ccf::errors::PrimaryNotFound,
308 "Request should be redirected to primary, but receiving node "
309 "does not know current primary address");
310 return true;
311 }
312 return false;
313 }
314
316 {
317 const bool is_backup =
318 (consensus != nullptr) && !consensus->can_replicate();
319
320 if (!is_backup)
321 {
322 auto resolver = redirections.to_backup;
323
324 const auto listen_interface =
325 ctx->get_session_context()->interface_id.value_or(
326 PRIMARY_RPC_INTERFACE);
327 const auto location =
328 resolve_redirect_location(resolver, tx, listen_interface);
329 if (location.has_value())
330 {
331 ctx->set_response_header(
332 http::headers::LOCATION,
333 fmt::format(
334 "https://{}{}", location.value(), ctx->get_request_url()));
335 ctx->set_response_status(HTTP_STATUS_TEMPORARY_REDIRECT);
336 return true;
337 }
338
339 // Should have redirected, but don't know how to. Return an error
340 ctx->set_error(
341 HTTP_STATUS_SERVICE_UNAVAILABLE,
342 ccf::errors::BackupNotFound,
343 "Request should be redirected to backup, but receiving node "
344 "does not know any current backup address");
345 return true;
346 }
347 return false;
348 }
349
350 default:
351 {
352 LOG_FAIL_FMT("Unhandled redirection strategy: {}", rs);
353 return false;
354 }
355 }
356 }
357
358 std::optional<ccf::NodeInfoNetwork_v2::NetInterface::Redirections>
359 get_redirections_config(const ccf::ListenInterfaceID& incoming_interface)
360 {
361 if (!node_configuration_subsystem)
362 {
363 node_configuration_subsystem =
364 node_context.get_subsystem<NodeConfigurationSubsystem>();
365 if (!node_configuration_subsystem)
366 {
367 LOG_FAIL_FMT("Unable to access NodeConfigurationSubsystem");
368 return std::nullopt;
369 }
370 }
371
372 const auto& node_config_state = node_configuration_subsystem->get();
373 const auto& interfaces =
374 node_config_state.node_config.network.rpc_interfaces;
375 const auto interface_it = interfaces.find(incoming_interface);
376 if (interface_it == interfaces.end())
377 {
379 "Could not find startup config for interface {}", incoming_interface);
380 return std::nullopt;
381 }
382
383 return interface_it->second.redirections;
384 }
385
386 bool check_session_consistency(std::shared_ptr<ccf::RpcContextImpl> ctx)
387 {
388 if (consensus != nullptr)
389 {
390 auto current_view = consensus->get_view();
391 auto session_ctx = ctx->get_session_context();
392 if (!session_ctx->active_view.has_value())
393 {
394 // First request on this session - assign the active term
395 session_ctx->active_view = current_view;
396 }
397 else if (current_view != session_ctx->active_view.value())
398 {
399 auto msg = fmt::format(
400 "Potential loss of session consistency on session {}. Started "
401 "in view {}, now in view {}. Closing session.",
402 session_ctx->client_session_id,
403 session_ctx->active_view.value(),
404 current_view);
405 LOG_INFO_FMT("{}", msg);
406
407 ctx->set_error(
408 HTTP_STATUS_INTERNAL_SERVER_ERROR,
409 ccf::errors::SessionConsistencyLost,
410 std::move(msg));
411 ctx->terminate_session = true;
412 return false;
413 }
414 }
415
416 return true;
417 }
418
419 std::unique_ptr<AuthnIdentity> get_authenticated_identity(
420 std::shared_ptr<ccf::RpcContextImpl> ctx,
422 const endpoints::EndpointDefinitionPtr& endpoint)
423 {
424 if (endpoint->authn_policies.empty())
425 {
426 return nullptr;
427 }
428
429 std::unique_ptr<AuthnIdentity> identity = nullptr;
430
431 std::string auth_error_reason;
432 std::vector<ODataAuthErrorDetails> error_details;
433 for (const auto& policy : endpoint->authn_policies)
434 {
435 identity = policy->authenticate(tx, ctx, auth_error_reason);
436 if (identity != nullptr)
437 {
438 break;
439 }
440 else
441 {
442 // Collate error details
443 error_details.emplace_back(ODataAuthErrorDetails{
444 policy->get_security_scheme_name(),
445 ccf::errors::InvalidAuthenticationInfo,
446 auth_error_reason});
447 }
448 }
449
450 if (identity == nullptr)
451 {
452 // If none were accepted, let the last set the response header
453 endpoint->authn_policies.back()->set_unauthenticated_error(
454 ctx, std::move(auth_error_reason));
455 // Return collated error details for the auth policies
456 // declared in the request
457 std::vector<nlohmann::json> json_details;
458 for (auto& details : error_details)
459 {
460 json_details.push_back(details);
461 }
462 ctx->set_error(
463 HTTP_STATUS_UNAUTHORIZED,
464 ccf::errors::InvalidAuthenticationInfo,
465 "Invalid authentication credentials.",
466 std::move(json_details));
467 }
468
469 return identity;
470 }
471
472 std::chrono::milliseconds get_forwarding_timeout(
473 std::shared_ptr<ccf::RpcContextImpl> ctx) const
474 {
475 auto r = std::chrono::milliseconds(3'000);
476
477 auto interface_id = ctx->get_session_context()->interface_id;
478 if (interface_id.has_value())
479 {
480 auto& ncs = node_configuration_subsystem->get();
481 auto rit = ncs.node_config.network.rpc_interfaces.find(*interface_id);
482 if (rit != ncs.node_config.network.rpc_interfaces.end())
483 {
484 if (rit->second.forwarding_timeout_ms.has_value())
485 {
486 r = std::chrono::milliseconds(*rit->second.forwarding_timeout_ms);
487 }
488 }
489 }
490
491 return r;
492 }
493
494 void forward(
495 std::shared_ptr<ccf::RpcContextImpl> ctx,
497 const endpoints::EndpointDefinitionPtr& endpoint)
498 {
499 // HTTP/2 does not support forwarding
500 if (ctx->get_http_version() == HttpVersion::HTTP2)
501 {
502 ctx->set_error(
503 HTTP_STATUS_NOT_IMPLEMENTED,
504 ccf::errors::NotImplemented,
505 "Request cannot be forwarded to primary on HTTP/2 interface.");
506
507 return;
508 }
509
510 if (!cmd_forwarder || !consensus)
511 {
512 ctx->set_error(
513 HTTP_STATUS_INTERNAL_SERVER_ERROR,
514 ccf::errors::InternalError,
515 "No consensus or forwarder to forward request.");
516
517 return;
518 }
519
520 if (ctx->get_session_context()->is_forwarded)
521 {
522 // If the request was already forwarded, return an error to prevent
523 // daisy chains.
524 ctx->set_error(
525 HTTP_STATUS_SERVICE_UNAVAILABLE,
526 ccf::errors::RequestAlreadyForwarded,
527 "RPC was already forwarded.");
528
529 return;
530 }
531
532 // Before attempting to forward, make sure we're in the same View as we
533 // previously thought we were.
534 if (!check_session_consistency(ctx))
535 {
536 return;
537 }
538
539 auto primary_id = consensus->primary();
540 if (!primary_id.has_value())
541 {
542 ctx->set_error(
543 HTTP_STATUS_SERVICE_UNAVAILABLE,
544 ccf::errors::InternalError,
545 "RPC could not be forwarded to unknown primary.");
546
547 return;
548 }
549
550 if (!cmd_forwarder->forward_command(
551 ctx,
552 primary_id.value(),
553 ctx->get_session_context()->caller_cert,
554 get_forwarding_timeout(ctx)))
555 {
556 ctx->set_error(
557 HTTP_STATUS_SERVICE_UNAVAILABLE,
558 ccf::errors::InternalError,
559 "Unable to establish channel to forward to primary.");
560
561 return;
562 }
563
564 LOG_TRACE_FMT("RPC forwarded to primary {}", primary_id.value());
565
566 // Indicate that the RPC has been forwarded to primary
567 ctx->response_is_pending = true;
568
569 // Ensure future requests on this session are forwarded for session
570 // consistency
571 ctx->get_session_context()->is_forwarding = true;
572
573 return;
574 }
575
576 void process_command(std::shared_ptr<ccf::RpcContextImpl> ctx)
577 {
578 size_t attempts = 0;
579 endpoints::EndpointDefinitionPtr endpoint = nullptr;
580
581 const auto start_time = ccf::get_enclave_time();
582
583 process_command_inner(ctx, endpoint, attempts);
584
585 const auto end_time = ccf::get_enclave_time();
586
587 if (endpoint != nullptr)
588 {
589 endpoints::RequestCompletedEvent rce;
590 rce.method = endpoint->dispatch.verb.c_str();
591 rce.dispatch_path = endpoint->dispatch.uri_path;
592 rce.status = ctx->get_response_status();
593 // Although enclave time returns a microsecond value, the actual
594 // precision/granularity depends on the host's TimeUpdater. By default
595 // this only advances each millisecond. Avoid implying more precision
596 // than that, by rounding to milliseconds
597 rce.exec_time = std::chrono::duration_cast<std::chrono::milliseconds>(
598 end_time - start_time);
599 rce.attempts = attempts;
600
602 }
603 else
604 {
605 endpoints::DispatchFailedEvent dfe;
606 dfe.method = ctx->get_method();
607 dfe.status = ctx->get_response_status();
608
610 }
611 }
612
613 void process_command_inner(
614 std::shared_ptr<ccf::RpcContextImpl> ctx,
616 size_t& attempts)
617 {
618 constexpr auto max_attempts = 30;
619 while (attempts < max_attempts)
620 {
621 if (consensus != nullptr)
622 {
623 if (
625 consensus->is_at_max_capacity())
626 {
627 ctx->set_error(
628 HTTP_STATUS_SERVICE_UNAVAILABLE,
629 ccf::errors::TooManyPendingTransactions,
630 "Too many transactions pending commit on the service.");
631 return;
632 }
633 }
634
635 std::unique_ptr<ccf::kv::CommittableTx> tx_p = tables.create_tx_ptr();
636 set_root_on_proposals(*ctx, *tx_p);
637
638 if (attempts > 0)
639 {
640 // If the endpoint has already been executed, the effects of its
641 // execution should be dropped
642 ctx->reset_response();
643 }
644
645 if (!is_open())
646 {
647 ctx->set_error(
648 HTTP_STATUS_NOT_FOUND,
649 ccf::errors::FrontendNotOpen,
650 "Frontend is not open.");
651 return;
652 }
653
654 ++attempts;
655 update_history();
656
657 endpoint = find_endpoint(ctx, *tx_p);
658 if (endpoint == nullptr)
659 {
660 return;
661 }
662
663 try
664 {
665 if (!check_uri_allowed(ctx, endpoint))
666 {
667 return;
668 }
669
670 const auto listen_interface =
671 ctx->get_session_context()->interface_id.value_or(
672 PRIMARY_RPC_INTERFACE);
673 const auto redirections = get_redirections_config(listen_interface);
674
675 // If a redirections config was specified, then redirections are used
676 // and no forwarding is done
677 if (redirections.has_value())
678 {
679 if (check_redirect(*tx_p, ctx, endpoint, redirections.value()))
680 {
681 return;
682 }
683 }
684 else
685 {
686 bool is_primary =
687 (consensus == nullptr) || consensus->can_replicate();
688 const bool forwardable = (consensus != nullptr);
689
690 if (!is_primary && forwardable)
691 {
692 switch (endpoint->properties.forwarding_required)
693 {
695 {
696 break;
697 }
698
700 {
701 if (ctx->get_session_context()->is_forwarding)
702 {
703 forward(ctx, *tx_p, endpoint);
704 return;
705 }
706 break;
707 }
708
710 {
711 forward(ctx, *tx_p, endpoint);
712 return;
713 }
714 }
715 }
716 }
717
718 std::unique_ptr<AuthnIdentity> identity =
719 get_authenticated_identity(ctx, *tx_p, endpoint);
720
721 auto args = ccf::EndpointContextImpl(ctx, std::move(tx_p));
722 // NB: tx_p is no longer valid, and must be accessed from args, which
723 // may change it!
724
725 // If any auth policy was required, check that at least one is
726 // accepted
727 if (!endpoint->authn_policies.empty())
728 {
729 if (identity == nullptr)
730 {
731 return;
732 }
733 else
734 {
735 args.caller = std::move(identity);
736 }
737 }
738
739 endpoints.execute_endpoint(endpoint, args);
740
741 // If we've seen a View change, abandon this transaction as
742 // inconsistent
743 if (!check_session_consistency(ctx))
744 {
745 return;
746 }
747
748 if (!ctx->should_apply_writes())
749 {
750 return;
751 }
752
753 if (ctx->response_is_pending)
754 {
755 return;
756 }
757 else if (args.owned_tx == nullptr)
758 {
760 "Bad endpoint: During execution of {} {}, returned a non-pending "
761 "response but stole ownership of Tx object",
762 ctx->get_request_verb().c_str(),
763 ctx->get_request_path());
764
765 ctx->clear_response_headers();
766 ctx->set_error(
767 HTTP_STATUS_INTERNAL_SERVER_ERROR,
768 ccf::errors::InternalError,
769 "Illegal endpoint implementation");
770 return;
771 }
772 // else args owns a valid Tx relating to a non-pending response, which
773 // should be applied
774 ccf::kv::CommittableTx& tx = *args.owned_tx;
775 ccf::kv::CommitResult result = tx.commit(ctx->claims, false);
776
777 switch (result)
778 {
780 {
781 auto tx_id = tx.get_txid();
782 if (tx_id.has_value() && consensus != nullptr)
783 {
784 try
785 {
786 // Only transactions that acquired one or more map handles
787 // have a TxID, while others (e.g. unauthenticated commands)
788 // don't. Also, only report a TxID if the consensus is set, as
789 // the consensus is required to verify that a TxID is valid.
791 endpoint, args, tx_id.value());
792 }
793 catch (const std::exception& e)
794 {
795 // run default handler to set transaction id in header
796 ctx->clear_response_headers();
798 args, tx_id.value());
799 ctx->set_error(
800 HTTP_STATUS_INTERNAL_SERVER_ERROR,
801 ccf::errors::InternalError,
802 fmt::format(
803 "Failed to execute local commit handler func: {}",
804 e.what()));
805 }
806 catch (...)
807 {
808 // run default handler to set transaction id in header
809 ctx->clear_response_headers();
811 args, tx_id.value());
812 ctx->set_error(
813 HTTP_STATUS_INTERNAL_SERVER_ERROR,
814 ccf::errors::InternalError,
815 "Failed to execute local commit handler func");
816 }
817 }
818
819 if (
820 consensus != nullptr && consensus->can_replicate() &&
821 history != nullptr)
822 {
823 history->try_emit_signature();
824 }
825
826 return;
827 }
828
830 {
831 break;
832 }
833
835 {
836 ctx->clear_response_headers();
837 ctx->set_error(
838 HTTP_STATUS_SERVICE_UNAVAILABLE,
839 ccf::errors::TransactionReplicationFailed,
840 "Transaction failed to replicate.");
841
842 return;
843 }
844 }
845 }
846 catch (const ccf::kv::CompactedVersionConflict& e)
847 {
848 // The executing transaction failed because of a conflicting
849 // compaction. Reset and retry
851 "Transaction execution conflicted with compaction: {}", e.what());
852 continue;
853 }
854 catch (RpcException& e)
855 {
856 ctx->clear_response_headers();
857 ctx->set_error(std::move(e.error));
858
859 return;
860 }
861 catch (const ccf::JsonParseError& e)
862 {
863 ctx->clear_response_headers();
864 ctx->set_error(
865 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.describe());
866
867 return;
868 }
869 catch (const nlohmann::json::exception& e)
870 {
871 ctx->clear_response_headers();
872 ctx->set_error(
873 HTTP_STATUS_BAD_REQUEST, ccf::errors::InvalidInput, e.what());
874
875 return;
876 }
877 catch (const ccf::kv::KvSerialiserException& e)
878 {
879 // If serialising the committed transaction fails, there is no way
880 // to recover safely (https://github.com/microsoft/CCF/issues/338).
881 // Better to abort.
882 LOG_DEBUG_FMT("Failed to serialise: {}", e.what());
883 LOG_FATAL_FMT("Failed to serialise");
884 abort();
885 }
886 catch (const std::exception& e)
887 {
888 ctx->clear_response_headers();
889 ctx->set_error(
890 HTTP_STATUS_INTERNAL_SERVER_ERROR,
891 ccf::errors::InternalError,
892 e.what());
893
894 return;
895 }
896 } // end of while loop
897
898 ctx->clear_response_headers();
899 ctx->set_error(
900 HTTP_STATUS_SERVICE_UNAVAILABLE,
901 ccf::errors::TransactionCommitAttemptsExceedLimit,
902 fmt::format(
903 "Transaction continued to conflict after {} attempts. Retry "
904 "later.",
905 max_attempts));
906
907 static constexpr size_t retry_after_seconds = 3;
908 ctx->set_response_header(http::headers::RETRY_AFTER, retry_after_seconds);
909
910 return;
911 }
912
913 public:
915 ccf::kv::Store& tables_,
917 ccf::AbstractNodeContext& node_context_) :
918 tables(tables_),
919 endpoints(handlers_),
920 node_context(node_context_),
921 consensus(nullptr),
922 history(nullptr)
923 {}
924
926 size_t sig_tx_interval_, size_t sig_ms_interval_) override
927 {
928 sig_tx_interval = sig_tx_interval_;
929 sig_ms_interval = std::chrono::milliseconds(sig_ms_interval_);
930 ms_to_sig = sig_ms_interval;
931 }
932
934 std::shared_ptr<AbstractForwarder> cmd_forwarder_) override
935 {
936 cmd_forwarder = cmd_forwarder_;
937 }
938
939 void open() override
940 {
941 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
942 if (!is_open_)
943 {
944 LOG_INFO_FMT("Opening frontend");
945 is_open_ = true;
947 }
948 }
949
950 bool is_open() override
951 {
952 std::lock_guard<ccf::pal::Mutex> mguard(open_lock);
953 return is_open_;
954 }
955
958 {
960 {
961 update_history();
962 if (history)
963 {
964 // Warning: Retrieving the current TxID and root from the history
965 // should only ever be used for the proposal creation endpoint and
966 // nothing else. Many bad things could happen otherwise (e.g. breaking
967 // session consistency).
968 const auto& [txid, root, term_of_next_version] =
970 tx.set_read_txid(txid, term_of_next_version);
972 }
973 }
974 }
975
984 void process(std::shared_ptr<ccf::RpcContextImpl> ctx) override
985 {
986 update_consensus();
987
988 // NB: If we want to re-execute on backups, the original command could
989 // be propagated from here
990 process_command(ctx);
991 }
992
997 void process_forwarded(std::shared_ptr<ccf::RpcContextImpl> ctx) override
998 {
999 if (!ctx->get_session_context()->is_forwarded)
1000 {
1001 throw std::logic_error(
1002 "Processing forwarded command with unitialised forwarded context");
1003 }
1004
1005 update_consensus();
1006 process_command(ctx);
1007 if (ctx->response_is_pending)
1008 {
1009 // This should never be called when process_command is called with a
1010 // forwarded RPC context
1011 throw std::logic_error("Forwarded RPC cannot be forwarded");
1012 }
1013 }
1014
1015 void tick(std::chrono::milliseconds elapsed) override
1016 {
1017 update_consensus();
1018
1019 endpoints.tick(elapsed);
1020 }
1021 };
1022}
Definition forwarder.h:17
static std::map< NodeId, NodeInfo > get_trusted_nodes(ccf::kv::ReadOnlyTx &tx)
Definition internal_tables_access.h:285
Definition json.h:24
std::string describe() const
Definition json.h:37
Definition rpc_context_impl.h:22
Definition frontend.h:36
ccf::kv::Store & tables
Definition frontend.h:38
RpcFrontend(ccf::kv::Store &tables_, endpoints::EndpointRegistry &handlers_, ccf::AbstractNodeContext &node_context_)
Definition frontend.h:914
void set_root_on_proposals(const ccf::RpcContextImpl &ctx, ccf::kv::CommittableTx &tx)
Definition frontend.h:956
void set_cmd_forwarder(std::shared_ptr< AbstractForwarder > cmd_forwarder_) override
Definition frontend.h:933
bool is_open() override
Definition frontend.h:950
void process(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:984
ccf::AbstractNodeContext & node_context
Definition frontend.h:40
void set_sig_intervals(size_t sig_tx_interval_, size_t sig_ms_interval_) override
Definition frontend.h:925
void process_forwarded(std::shared_ptr< ccf::RpcContextImpl > ctx) override
Definition frontend.h:997
void open() override
Definition frontend.h:939
endpoints::EndpointRegistry & endpoints
Definition frontend.h:39
void tick(std::chrono::milliseconds elapsed) override
Definition frontend.h:1015
Definition rpc_handler.h:24
Definition endpoint_registry.h:117
virtual void execute_endpoint(EndpointDefinitionPtr e, EndpointContext &args)
Definition endpoint_registry.cpp:493
virtual void handle_event_request_completed(const ccf::endpoints::RequestCompletedEvent &event)
Definition endpoint_registry.h:286
virtual std::set< RESTVerb > get_allowed_verbs(ccf::kv::Tx &, const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:522
virtual void execute_endpoint_locally_committed(EndpointDefinitionPtr e, CommandEndpointContext &args, const TxID &tx_id)
Definition endpoint_registry.cpp:507
virtual void tick(std::chrono::milliseconds)
Definition endpoint_registry.cpp:579
virtual void init_handlers()
Definition endpoint_registry.cpp:412
void set_consensus(ccf::kv::Consensus *c)
Definition endpoint_registry.cpp:581
virtual bool request_needs_root(const ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:553
virtual void handle_event_dispatch_failed(const ccf::endpoints::DispatchFailedEvent &event)
Definition endpoint_registry.h:290
virtual EndpointDefinitionPtr find_endpoint(ccf::kv::Tx &, ccf::RpcContext &rpc_ctx)
Definition endpoint_registry.cpp:414
void set_history(ccf::kv::TxHistory *h)
Definition endpoint_registry.cpp:586
virtual bool apply_uncommitted_tx_backpressure() const
Definition endpoint_registry.h:294
Definition committable_tx.h:18
std::optional< TxID > get_txid()
Definition committable_tx.h:308
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), bool track_read_versions=false, std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:131
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:358
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:368
Definition compacted_version_conflict.h:10
char const * what() const
Definition compacted_version_conflict.h:17
Definition kv_types.h:430
Definition kv_types.h:352
virtual const char * what() const
Definition kv_types.h:359
Definition tx.h:161
Definition store.h:87
std::shared_ptr< TxHistory > get_history() override
Definition store.h:197
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1251
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:184
Definition kv_types.h:366
virtual void try_emit_signature()=0
virtual std::tuple< ccf::kv::TxID, ccf::crypto::Sha256Hash, ccf::kv::Term > get_replicated_state_txid_and_root()=0
#define LOG_INFO_FMT
Definition logger.h:395
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FATAL_FMT
Definition logger.h:397
#define LOG_FAIL_FMT
Definition logger.h:396
void default_locally_committed_func(CommandEndpointContext &ctx, const TxID &tx_id)
Definition endpoint_registry.cpp:198
std::shared_ptr< const EndpointDefinition > EndpointDefinitionPtr
Definition endpoint.h:234
CommitResult
Definition kv_types.h:246
@ FAIL_NO_REPLICATE
Definition kv_types.h:249
@ SUCCESS
Definition kv_types.h:247
@ FAIL_CONFLICT
Definition kv_types.h:248
std::mutex Mutex
Definition locking.h:17
Definition app_interface.h:15
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition consensus_types.h:23
STL namespace.
Definition node_context.h:12
std::shared_ptr< T > get_subsystem(const std::string &name) const
Definition node_context.h:37
Definition endpoint_context_impl.h:13
Definition node_info_network.h:119
RedirectionResolverConfig to_primary
Definition node_info_network.h:120
RedirectionResolverConfig to_backup
Definition node_info_network.h:121