19 std::shared_ptr<ccf::TLSSession>
tls_io;
26 std::unique_ptr<ccf::tls::Context> ctx,
30 session_id_, writer_factory,
std::move(ctx))),
36 virtual bool parse(std::span<const uint8_t> data) = 0;
38 void send_data(std::span<const uint8_t> data)
override
42 tls_io->send_raw(data.data(), data.size());
47 throw std::logic_error(
"Unimplemented");
57 tls_io->recv_buffered(data.data(), data.size());
65 constexpr auto min_read_block_size = 4096;
66 if (data.size() < min_read_block_size)
68 data.resize(min_read_block_size);
71 auto n_read =
tls_io->read(data.data(), data.size(),
false);
82 bool cont =
parse({data.data(), n_read});
89 n_read =
tls_io->read(data.data(), data.size(),
false);
99 size_t client_session_id_,
100 const std::vector<uint8_t>& caller_cert_,
101 const std::optional<ccf::ListenInterfaceID>& interface_id_,
116 std::weak_ptr<http2::ServerParser> server_parser;
121 const std::shared_ptr<http2::ServerParser>& server_parser_) :
122 stream_id(stream_id_),
123 server_parser(server_parser_)
130 std::span<const uint8_t> body)
override
132 auto sp = server_parser.lock();
142 catch (
const std::exception& e)
145 "Error sending response on stream {}: {}", stream_id, e.what());
155 auto sp = server_parser.lock();
160 sp->start_stream(stream_id, status, headers);
162 catch (
const std::exception& e)
164 LOG_DEBUG_FMT(
"Error sending headers {}: {}", stream_id, e.what());
178 auto sp = server_parser.lock();
183 sp->close_stream(stream_id, std::move(trailers));
185 catch (
const std::exception& e)
187 LOG_DEBUG_FMT(
"Error closing stream {}: {}", stream_id, e.what());
201 auto sp = server_parser.lock();
206 sp->send_data(stream_id, data);
208 catch (
const std::exception& e)
211 "Error streaming data on stream {}: {}", stream_id, e.what());
227 auto sp = server_parser.lock();
232 sp->set_on_stream_close_callback(stream_id, cb);
234 catch (
const std::exception& e)
237 "Error setting close callback on stream {}: {}",
257 std::shared_ptr<http2::ServerParser> server_parser;
259 std::shared_ptr<ccf::RPCMap> rpc_map;
260 std::shared_ptr<ccf::RpcHandler> handler;
265 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
268 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
271 auto it = session_ctxs.find(stream_id);
272 if (it == session_ctxs.end())
274 it = session_ctxs.emplace_hint(
278 std::make_shared<HTTP2SessionContext>(
285 std::shared_ptr<HTTPResponder> get_stream_responder(
289 if (responder ==
nullptr)
292 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
299 void respond_with_error(
304 const auto s = body.dump();
307 headers[ccf::http::headers::CONTENT_TYPE] =
308 ccf::http::headervalues::contenttype::JSON;
310 get_stream_responder(stream_id)->send_response(
314 {(const uint8_t*)s.data(), s.size()});
319 std::shared_ptr<ccf::RPCMap> rpc_map,
323 std::unique_ptr<ccf::tls::Context> ctx,
329 std::make_shared<
http2::ServerParser>(*this, configuration)),
331 interface_id(interface_id),
332 responder_lookup(responder_lookup_)
334 server_parser->set_outgoing_data_handler(
335 [
this](std::span<const uint8_t> data) {
336 this->
tls_io->send_raw(data.data(), data.size());
345 bool parse(std::span<const uint8_t> data)
override
349 if (!server_parser->execute(data.data(), data.size()))
361 error_reporter->report_request_payload_too_large_error(interface_id);
367 HTTP_STATUS_PAYLOAD_TOO_LARGE,
368 ccf::errors::RequestBodyTooLarge,
379 error_reporter->report_request_header_too_large_error(interface_id);
385 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
386 ccf::errors::RequestHeaderTooLarge,
393 catch (
const std::exception& e)
413 const std::string_view& url,
415 std::vector<uint8_t>&& body,
416 int32_t stream_id)
override
419 "Processing msg({}, {} [{} bytes])",
420 llhttp_method_name(verb),
424 auto responder = get_stream_responder(stream_id);
425 auto session_ctx = get_session_ctx(stream_id);
429 std::shared_ptr<http::HttpRpcContext> rpc_ctx =
nullptr;
432 rpc_ctx = std::make_shared<HttpRpcContext>(
441 catch (std::exception& e)
444 HTTP_STATUS_INTERNAL_SERVER_ERROR,
445 ccf::errors::InternalError,
446 fmt::format(
"Error constructing RpcContext: {}", e.what())});
448 std::shared_ptr<ccf::RpcHandler> search =
449 http::fetch_rpc_handler(rpc_ctx, rpc_map);
451 search->process(rpc_ctx);
453 if (rpc_ctx->response_is_pending)
461 responder->send_response(
462 rpc_ctx->get_response_http_status(),
463 rpc_ctx->get_response_headers(),
464 rpc_ctx->get_response_trailers(),
465 std::move(rpc_ctx->get_response_body()));
468 catch (
const std::exception& e)
471 HTTP_STATUS_INTERNAL_SERVER_ERROR,
472 ccf::errors::InternalError,
473 fmt::format(
"Exception: {}", e.what())});
477 LOG_DEBUG_FMT(
"Closing connection due to exception: {}", e.what());
487 std::span<const uint8_t> body)
override
489 return get_stream_responder(http2::DEFAULT_STREAM_ID)
491 status_code, std::move(headers), std::move(trailers), body);
497 return get_stream_responder(http2::DEFAULT_STREAM_ID)
498 ->start_stream(status, headers);
503 return get_stream_responder(http2::DEFAULT_STREAM_ID)->stream_data(data);
508 return get_stream_responder(http2::DEFAULT_STREAM_ID)
509 ->close_stream(std::move(trailers));
515 return get_stream_responder(http2::DEFAULT_STREAM_ID)
516 ->set_on_stream_close_callback(cb);
531 std::unique_ptr<ccf::tls::Context> ctx) :
537 [
this](std::span<const uint8_t> data) {
538 this->
tls_io->send_raw(data.data(), data.size());
542 bool parse(std::span<const uint8_t> data)
override
547 client_parser.
execute(data.data(), data.size());
551 catch (
const std::exception& e)
556 "Error occurred while parsing fragment {} byte fragment:\n{}",
558 std::string_view((
char const*)data.data(), data.size()));
568 request.get_method(),
570 request.get_headers(),
571 {request.get_content_data(), request.get_content_length()});
577 std::vector<uint8_t>&& body)
override
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:22
Definition tls_session.h:29
Definition http_responder.h:16
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:35
Definition http2_parser.h:500
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::span< const uint8_t > body)
Definition http2_parser.h:510
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:523
void send_request(http::Request &&request) override
Definition http2_session.h:565
void handle_response(http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:574
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:528
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:542
Definition http2_session.h:255
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:512
~HTTP2ServerSession()
Definition http2_session.h:340
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:345
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map, int64_t session_id_, const ccf::ListenInterfaceID &interface_id, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter, http::ResponderLookup &responder_lookup_)
Definition http2_session.h:318
bool send_response(http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:483
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:411
bool start_stream(http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:494
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:501
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:506
Definition http2_session.h:17
std::shared_ptr< ErrorReporter > error_reporter
Definition http2_session.h:20
virtual bool parse(std::span< const uint8_t > data)=0
::tcp::ConnID session_id
Definition http2_session.h:21
HTTP2Session(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const std::shared_ptr< ErrorReporter > &error_reporter=nullptr)
Definition http2_session.h:23
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition http2_session.h:55
std::shared_ptr< ccf::TLSSession > tls_io
Definition http2_session.h:19
void send_data_thread(std::vector< uint8_t > &&data) override
Definition http2_session.h:45
void close_session() override
Definition http2_session.h:50
void send_data(std::span< const uint8_t > data) override
Definition http2_session.h:38
Definition http2_session.h:109
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:224
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:176
bool send_response(http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:126
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:119
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:199
bool start_stream(http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:152
Definition http_proc.h:20
Definition http_builder.h:106
Definition responder_lookup.h:14
void cleanup_responders(::tcp::ConnID session_id)
Definition responder_lookup.h:52
std::shared_ptr< ccf::http::HTTPResponder > lookup_responder(::tcp::ConnID session_id, http2::StreamId stream_id)
Definition responder_lookup.h:25
void add_responder(::tcp::ConnID session_id, http2::StreamId stream_id, std::shared_ptr< ccf::http::HTTPResponder > responder)
Definition responder_lookup.h:43
Definition http_proc.h:31
Definition ring_buffer_types.h:153
llhttp_status http_status
Definition http_status.h:7
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13
Definition app_interface.h:15
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:21
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
int64_t ConnID
Definition msg_types.h:9
Definition odata_error.h:56
Definition odata_error.h:48
Definition odata_error.h:37
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:95
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:98
int32_t stream_id
Definition http2_session.h:96