21 std::shared_ptr<ccf::http::HTTPResponder> http_responder;
24 BaseStream(
const std::shared_ptr<ccf::http::HTTPResponder>& r) :
34 http_responder->start_stream(status, headers);
39 return http_responder->stream_data(data);
44 return http_responder->close_stream(std::move(trailers));
49 http_responder->set_on_stream_close_callback(close_cb);
58 const std::shared_ptr<ccf::http::HTTPResponder>& r,
88 close(make_grpc_status_ok());
94 auto success_response = std::get_if<EmptySuccessResponse>(&resp);
95 if (success_response !=
nullptr)
97 trailers.emplace(make_status_trailer(success_response->status.code()));
99 make_message_trailer(success_response->status.message()));
103 auto error_response = std::get<ErrorResponse>(resp);
105 trailers.emplace(make_status_trailer(error_response.status.code()));
106 trailers.emplace(make_message_trailer(error_response.status.message()));
113 template <
typename T>
116 template <
typename T>
119 static std::shared_ptr<ccf::http::HTTPResponder> get_http_responder(
120 const std::shared_ptr<ccf::RpcContext>& rpc_ctx)
122 auto http_responder = rpc_ctx->get_responder();
123 if (http_responder ==
nullptr)
125 throw std::logic_error(
"Found no responder for current session/stream");
127 return http_responder;
130 template <
typename T>
131 static StreamPtr<T> make_stream(
132 const std::shared_ptr<ccf::RpcContext>& rpc_ctx,
136 return std::make_unique<Stream<T>>(
137 get_http_responder(rpc_ctx), status, headers);
140 template <
typename T>
141 static DetachedStreamPtr<T> detach_stream(
142 const std::shared_ptr<ccf::RpcContext>& ctx,
143 StreamPtr<T>&& stream,
147 if (rpc_ctx_impl ==
nullptr)
149 throw std::logic_error(
"Unexpected type for RpcContext");
152 rpc_ctx_impl->response_is_pending =
true;
154 return std::make_unique<DetachedStream<T>>(*stream.get(), close_cb);
Definition rpc_context_impl.h:22
bool stream_data(std::span< const uint8_t > data)
Definition stream.h:37
void set_on_close_callback(http::StreamOnCloseCallback close_cb)
Definition stream.h:47
BaseStream(const std::shared_ptr< ccf::http::HTTPResponder > &r)
Definition stream.h:24
BaseStream(const BaseStream &)=default
void start_stream(http_status status=HTTP_STATUS_OK, const http::HeaderMap &headers=default_response_headers)
Definition stream.h:30
bool close_stream(http::HeaderMap &&trailers)
Definition stream.h:42
~DetachedStream()
Definition stream.h:86
DetachedStream(const Stream< T > &s, http::StreamOnCloseCallback close_cb_=nullptr)
Definition stream.h:79
bool close(const GrpcAdapterEmptyResponse &resp)
Definition stream.h:91
Stream(const Stream &s)
Definition stream.h:65
bool stream_msg(const T &msg)
Definition stream.h:69
Stream(const std::shared_ptr< ccf::http::HTTPResponder > &r, http_status s=HTTP_STATUS_OK, const http::HeaderMap &h=default_response_headers)
Definition stream.h:57
llhttp_status http_status
Definition http_status.h:7
Definition grpc_status.h:98
std::unique_ptr< Stream< T > > StreamPtr
Definition stream.h:114
std::unique_ptr< DetachedStream< T > > DetachedStreamPtr
Definition stream.h:117
GrpcAdapterResponse< EmptyResponse > GrpcAdapterEmptyResponse
Definition types.h:46
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13