CCF
Loading...
Searching...
No Matches
stream.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
7#include "message.h"
8#include "status.h"
9#include "types.h"
10
11#include <memory>
12
13namespace ccf::grpc
14{
15 // Note: Streams are not currently thread safe
16
17 // Vanilla HTTP/2 stream
19 {
20 private:
21 std::shared_ptr<ccf::http::HTTPResponder> http_responder;
22
23 protected:
24 BaseStream(const std::shared_ptr<ccf::http::HTTPResponder>& r) :
25 http_responder(r)
26 {}
27
28 BaseStream(const BaseStream&) = default;
29
31 http_status status = HTTP_STATUS_OK,
32 const http::HeaderMap& headers = default_response_headers)
33 {
34 http_responder->start_stream(status, headers);
35 }
36
37 bool stream_data(std::span<const uint8_t> data)
38 {
39 return http_responder->stream_data(data);
40 }
41
43 {
44 return http_responder->close_stream(std::move(trailers));
45 }
46
48 {
49 http_responder->set_on_stream_close_callback(close_cb);
50 }
51 };
52
53 template <typename T>
54 class Stream : public BaseStream
55 {
56 public:
58 const std::shared_ptr<ccf::http::HTTPResponder>& r,
59 http_status s = HTTP_STATUS_OK,
60 const http::HeaderMap& h = default_response_headers) :
61 BaseStream(r)
62 {
63 start_stream(s, h);
64 }
65 Stream(const Stream& s) : BaseStream(s) {}
66
67 Stream(Stream&&) = delete;
68
69 bool stream_msg(const T& msg)
70 {
71 return stream_data(serialise_grpc_message(msg));
72 }
73 };
74
75 template <typename T>
76 class DetachedStream : public Stream<T>
77 {
78 public:
80 const Stream<T>& s, http::StreamOnCloseCallback close_cb_ = nullptr) :
81 Stream<T>(s)
82 {
84 }
85
87 {
88 close(make_grpc_status_ok());
89 }
90
92 {
93 http::HeaderMap trailers;
94 auto success_response = std::get_if<EmptySuccessResponse>(&resp);
95 if (success_response != nullptr)
96 {
97 trailers.emplace(make_status_trailer(success_response->status.code()));
98 trailers.emplace(
99 make_message_trailer(success_response->status.message()));
100 }
101 else
102 {
103 auto error_response = std::get<ErrorResponse>(resp);
104
105 trailers.emplace(make_status_trailer(error_response.status.code()));
106 trailers.emplace(make_message_trailer(error_response.status.message()));
107 }
108
109 return this->close_stream(std::move(trailers));
110 }
111 };
112
113 template <typename T>
114 using StreamPtr = std::unique_ptr<Stream<T>>;
115
116 template <typename T>
117 using DetachedStreamPtr = std::unique_ptr<DetachedStream<T>>;
118
119 static std::shared_ptr<ccf::http::HTTPResponder> get_http_responder(
120 const std::shared_ptr<ccf::RpcContext>& rpc_ctx)
121 {
122 auto http_responder = rpc_ctx->get_responder();
123 if (http_responder == nullptr)
124 {
125 throw std::logic_error("Found no responder for current session/stream");
126 }
127 return http_responder;
128 }
129
130 template <typename T>
131 static StreamPtr<T> make_stream(
132 const std::shared_ptr<ccf::RpcContext>& rpc_ctx,
133 http_status status = HTTP_STATUS_OK,
134 const http::HeaderMap& headers = default_response_headers)
135 {
136 return std::make_unique<Stream<T>>(
137 get_http_responder(rpc_ctx), status, headers);
138 }
139
140 template <typename T>
141 static DetachedStreamPtr<T> detach_stream(
142 const std::shared_ptr<ccf::RpcContext>& ctx,
143 StreamPtr<T>&& stream,
144 http::StreamOnCloseCallback close_cb = nullptr)
145 {
146 auto rpc_ctx_impl = dynamic_cast<ccf::RpcContextImpl*>(ctx.get());
147 if (rpc_ctx_impl == nullptr)
148 {
149 throw std::logic_error("Unexpected type for RpcContext");
150 }
151
152 rpc_ctx_impl->response_is_pending = true;
153
154 return std::make_unique<DetachedStream<T>>(*stream.get(), close_cb);
155 }
156}
Definition rpc_context_impl.h:22
Definition stream.h:19
bool stream_data(std::span< const uint8_t > data)
Definition stream.h:37
void set_on_close_callback(http::StreamOnCloseCallback close_cb)
Definition stream.h:47
BaseStream(const std::shared_ptr< ccf::http::HTTPResponder > &r)
Definition stream.h:24
BaseStream(const BaseStream &)=default
void start_stream(http_status status=HTTP_STATUS_OK, const http::HeaderMap &headers=default_response_headers)
Definition stream.h:30
bool close_stream(http::HeaderMap &&trailers)
Definition stream.h:42
Definition stream.h:77
~DetachedStream()
Definition stream.h:86
DetachedStream(const Stream< T > &s, http::StreamOnCloseCallback close_cb_=nullptr)
Definition stream.h:79
bool close(const GrpcAdapterEmptyResponse &resp)
Definition stream.h:91
Definition stream.h:55
Stream(const Stream &s)
Definition stream.h:65
Stream(Stream &&)=delete
bool stream_msg(const T &msg)
Definition stream.h:69
Stream(const std::shared_ptr< ccf::http::HTTPResponder > &r, http_status s=HTTP_STATUS_OK, const http::HeaderMap &h=default_response_headers)
Definition stream.h:57
llhttp_status http_status
Definition http_status.h:7
Definition grpc_status.h:98
std::unique_ptr< Stream< T > > StreamPtr
Definition stream.h:114
std::unique_ptr< DetachedStream< T > > DetachedStreamPtr
Definition stream.h:117
GrpcAdapterResponse< EmptyResponse > GrpcAdapterEmptyResponse
Definition types.h:46
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13