CCF
Loading...
Searching...
No Matches
grpc.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/odata_error.h"
7#include "message.h"
10#include "stream.h"
11#include "types.h"
12
13#include <memory>
14
15namespace ccf::grpc
16{
17 template <typename In>
18 In get_grpc_payload(const std::shared_ptr<ccf::RpcContext>& ctx)
19 {
20 auto& request_body = ctx->get_request_body();
21 auto request_content_type =
22 ctx->get_request_header(http::headers::CONTENT_TYPE);
23
24 auto data = request_body.data();
25 auto size = request_body.size();
26
27 if (request_content_type != http::headervalues::contenttype::GRPC)
28 {
29 throw RpcException(
30 HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE,
31 ccf::errors::UnsupportedContentType,
32 fmt::format(
33 "Unsupported content type. Only {} is supported ",
34 http::headervalues::contenttype::GRPC));
35 }
36
38 {
39 using Message = typename In::value_type;
40 In messages;
41 while (size != 0)
42 {
43 const auto message_length = impl::read_message_frame(data, size);
44 if (message_length > size)
45 {
46 throw std::logic_error(fmt::format(
47 "Error in gRPC frame: only {} bytes remaining but message header "
48 "says messages is {} bytes",
49 size,
50 message_length));
51 }
52
53 Message& msg = messages.emplace_back();
54 if (!msg.ParseFromArray(data, message_length))
55 {
56 throw std::logic_error(fmt::format(
57 "Error deserialising protobuf payload of type {}, size {} (message "
58 "{} in "
59 "stream)",
60 msg.GetTypeName(),
61 size,
62 messages.size()));
63 }
64 data += message_length;
65 size -= message_length;
66 }
67 return messages;
68 }
69 else
70 {
71 const auto message_length = impl::read_message_frame(data, size);
72 if (size != message_length)
73 {
74 throw std::logic_error(fmt::format(
75 "Error in gRPC frame: frame size is {} but messages is {} bytes",
76 size,
77 message_length));
78 }
79
80 In in;
81 if (!in.ParseFromArray(data, message_length))
82 {
83 throw std::logic_error(fmt::format(
84 "Error deserialising protobuf payload of type {}, size {}",
85 in.GetTypeName(),
86 size));
87 }
88 return in;
89 }
90 }
91
93 const std::shared_ptr<ccf::RpcContext>& ctx)
94 {
95 for (auto const& h : default_response_headers)
96 {
97 ctx->set_response_header(h.first, h.second);
98 }
99 }
100
102 const std::shared_ptr<ccf::RpcContext>& ctx,
103 const ccf::protobuf::Status& status)
104 {
105 ctx->set_response_trailer(make_status_trailer(status.code()));
106 ctx->set_response_trailer(make_message_trailer(status.message()));
107 }
108
109 template <typename Out>
112 const std::shared_ptr<ccf::RpcContext>& ctx)
113 {
115
116 if (auto success_response = std::get_if<SuccessResponse<Out>>(&r))
117 {
118 std::vector<uint8_t> v;
119
121 {
122 v = serialise_grpc_messages(success_response->body);
123 }
124 else
125 {
126 v = serialise_grpc_message(success_response->body);
127 }
128
129 ctx->set_response_body(v);
130
131 set_grpc_response_trailers(ctx, success_response->status);
132 }
133 else if (std::get_if<ErrorResponse>(&r))
134 {
135 auto error_response = std::get<ErrorResponse>(r);
136
137 set_grpc_response_trailers(ctx, error_response.status);
138 }
139 }
140}
141
142namespace ccf
143{
144 template <typename In, typename Out>
145 using GrpcEndpoint = std::function<grpc::GrpcAdapterResponse<Out>(
147
148 template <typename In, typename Out>
149 using GrpcReadOnlyEndpoint = std::function<grpc::GrpcAdapterResponse<Out>(
151
152 template <typename In, typename Out>
153 using GrpcCommandEndpoint = std::function<grpc::GrpcAdapterResponse<Out>(
155
156 template <typename In, typename Out>
160
161 template <typename In, typename Out>
163 {
164 return [f](endpoints::EndpointContext& ctx) {
165 grpc::set_grpc_response<Out>(
166 f(ctx, grpc::get_grpc_payload<In>(ctx.rpc_ctx)), ctx.rpc_ctx);
167 };
168 }
169
170 template <typename In, typename Out>
173 {
174 return [f](endpoints::ReadOnlyEndpointContext& ctx) {
175 grpc::set_grpc_response<Out>(
176 f(ctx, grpc::get_grpc_payload<In>(ctx.rpc_ctx)), ctx.rpc_ctx);
177 };
178 }
179
180 template <typename In, typename Out>
183 {
184 return [f](endpoints::CommandEndpointContext& ctx) {
185 grpc::set_grpc_response<Out>(
186 f(ctx, grpc::get_grpc_payload<In>(ctx.rpc_ctx)), ctx.rpc_ctx);
187 };
188 }
189
190 // Note: For now, only command endpoints (i.e. with no ccf::kv::Tx) support
191 // gRPC server streaming.
192 template <typename In, typename Out>
195 {
196 return [f](endpoints::CommandEndpointContext& ctx) {
198 const auto result =
199 f(ctx,
200 grpc::get_grpc_payload<In>(ctx.rpc_ctx),
201 grpc::make_stream<Out>(ctx.rpc_ctx));
202
203 if (auto error_response = std::get_if<grpc::ErrorResponse>(&result))
204 {
205 grpc::set_grpc_response_trailers(ctx.rpc_ctx, error_response->status);
206 }
207 };
208 }
209}
std::function< void(CommandEndpointContext &args)> CommandEndpointFunction
Definition endpoint_context.h:52
std::function< void(ReadOnlyEndpointContext &args)> ReadOnlyEndpointFunction
Definition endpoint_context.h:80
std::function< void(EndpointContext &args)> EndpointFunction
Definition endpoint_context.h:63
Definition grpc_status.h:98
void set_grpc_default_headers(const std::shared_ptr< ccf::RpcContext > &ctx)
Definition grpc.h:92
std::unique_ptr< Stream< T > > StreamPtr
Definition stream.h:114
In get_grpc_payload(const std::shared_ptr< ccf::RpcContext > &ctx)
Definition grpc.h:18
std::variant< ErrorResponse, SuccessResponse< T > > GrpcAdapterResponse
Definition types.h:34
std::variant< ErrorResponse, PendingResponse > GrpcAdapterStreamingResponse
Definition types.h:42
void set_grpc_response(const GrpcAdapterResponse< Out > &r, const std::shared_ptr< ccf::RpcContext > &ctx)
Definition grpc.h:110
void set_grpc_response_trailers(const std::shared_ptr< ccf::RpcContext > &ctx, const ccf::protobuf::Status &status)
Definition grpc.h:101
Definition app_interface.h:15
std::function< grpc::GrpcAdapterResponse< Out >(endpoints::ReadOnlyEndpointContext &, In &&)> GrpcReadOnlyEndpoint
Definition grpc.h:150
endpoints::EndpointFunction grpc_adapter(const GrpcEndpoint< In, Out > &f)
Definition grpc.h:162
std::function< grpc::GrpcAdapterResponse< Out >(endpoints::EndpointContext &, In &&)> GrpcEndpoint
Definition grpc.h:146
std::function< grpc::GrpcAdapterStreamingResponse(endpoints::CommandEndpointContext &, In &&, grpc::StreamPtr< Out > &&)> GrpcCommandUnaryStreamEndpoint
Definition grpc.h:159
endpoints::CommandEndpointFunction grpc_command_adapter(const GrpcCommandEndpoint< In, Out > &f)
Definition grpc.h:181
endpoints::ReadOnlyEndpointFunction grpc_read_only_adapter(const GrpcReadOnlyEndpoint< In, Out > &f)
Definition grpc.h:171
std::function< grpc::GrpcAdapterResponse< Out >(endpoints::CommandEndpointContext &, In &&)> GrpcCommandEndpoint
Definition grpc.h:154
endpoints::CommandEndpointFunction grpc_command_unary_stream_adapter(const GrpcCommandUnaryStreamEndpoint< In, Out > &f)
Definition grpc.h:193
Definition rpc_exception.h:13
Definition endpoint_context.h:24
Definition endpoint_context.h:55
Definition endpoint_context.h:70
Definition types.h:18
Definition nonstd.h:49