CCF
Loading...
Searching...
No Matches
http2_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
8#include "enclave/rpc_map.h"
9#include "error_reporter.h"
11#include "http2_parser.h"
12#include "http_rpc_context.h"
13
14namespace http
15{
17 {
18 protected:
19 std::shared_ptr<ccf::TLSSession> tls_io;
20 std::shared_ptr<ErrorReporter> error_reporter;
22
24 ::tcp::ConnID session_id_,
26 std::unique_ptr<ccf::tls::Context> ctx,
27 const std::shared_ptr<ErrorReporter>& error_reporter = nullptr) :
28 ccf::ThreadedSession(session_id_),
29 tls_io(std::make_shared<ccf::TLSSession>(
30 session_id_, writer_factory, std::move(ctx))),
32 session_id(session_id_)
33 {}
34
35 public:
36 virtual bool parse(std::span<const uint8_t> data) = 0;
37
38 void send_data(std::span<const uint8_t> data) override
39 {
40 // Override send_data rather than send_data_thread, as the TLSSession
41 // handles dispatching for thread affinity
42 tls_io->send_raw(data.data(), data.size());
43 }
44
45 void send_data_thread(std::vector<uint8_t>&& data) override
46 {
47 throw std::logic_error("Unimplemented");
48 }
49
50 void close_session() override
51 {
52 tls_io->close();
53 }
54
55 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
56 {
57 tls_io->recv_buffered(data.data(), data.size());
58
59 LOG_TRACE_FMT("recv called with {} bytes", data.size());
60
61 // Try to parse all incoming data, reusing the vector we were just passed
62 // for storage. Increase the size if the received vector was too small
63 // (for the case where this chunk is very small, but we had some previous
64 // data to continue reading).
65 constexpr auto min_read_block_size = 4096;
66 if (data.size() < min_read_block_size)
67 {
68 data.resize(min_read_block_size);
69 }
70
71 auto n_read = tls_io->read(data.data(), data.size(), false);
72
73 while (true)
74 {
75 if (n_read == 0)
76 {
77 return;
78 }
79
80 LOG_TRACE_FMT("Going to parse {} bytes", n_read);
81
82 bool cont = parse({data.data(), n_read});
83 if (!cont)
84 {
85 return;
86 }
87
88 // Used all provided bytes - check if more are available
89 n_read = tls_io->read(data.data(), data.size(), false);
90 }
91 }
92 };
93
95 {
96 int32_t stream_id;
97
99 size_t client_session_id_,
100 const std::vector<uint8_t>& caller_cert_,
101 const std::optional<ccf::ListenInterfaceID>& interface_id_,
102 http2::StreamId stream_id_) :
103 ccf::SessionContext(client_session_id_, caller_cert_, interface_id_),
104 stream_id(stream_id_)
105 {}
106 };
107
109 {
110 private:
111 http2::StreamId stream_id;
112
113 // Associated HTTP2ServerSession may be closed while responder is held
114 // elsewhere (e.g. async streaming) so keep a weak pointer to parser and
115 // report an error to caller to discard responder.
116 std::weak_ptr<http2::ServerParser> server_parser;
117
118 public:
120 http2::StreamId stream_id_,
121 const std::shared_ptr<http2::ServerParser>& server_parser_) :
122 stream_id(stream_id_),
123 server_parser(server_parser_)
124 {}
125
127 http_status status_code,
128 ccf::http::HeaderMap&& headers,
129 ccf::http::HeaderMap&& trailers,
130 std::span<const uint8_t> body) override
131 {
132 auto sp = server_parser.lock();
133 try
134 {
135 sp->respond(
136 stream_id,
137 status_code,
138 std::move(headers),
139 std::move(trailers),
140 body);
141 }
142 catch (const std::exception& e)
143 {
145 "Error sending response on stream {}: {}", stream_id, e.what());
146 return false;
147 }
148
149 return true;
150 }
151
153 http_status status, const ccf::http::HeaderMap& headers) override
154 {
155 auto sp = server_parser.lock();
156 if (sp)
157 {
158 try
159 {
160 sp->start_stream(stream_id, status, headers);
161 }
162 catch (const std::exception& e)
163 {
164 LOG_DEBUG_FMT("Error sending headers {}: {}", stream_id, e.what());
165 return false;
166 }
167 }
168 else
169 {
170 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
171 return false;
172 }
173 return true;
174 }
175
176 bool close_stream(ccf::http::HeaderMap&& trailers) override
177 {
178 auto sp = server_parser.lock();
179 if (sp)
180 {
181 try
182 {
183 sp->close_stream(stream_id, std::move(trailers));
184 }
185 catch (const std::exception& e)
186 {
187 LOG_DEBUG_FMT("Error closing stream {}: {}", stream_id, e.what());
188 return false;
189 }
190 }
191 else
192 {
193 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
194 return false;
195 }
196 return true;
197 }
198
199 bool stream_data(std::span<const uint8_t> data) override
200 {
201 auto sp = server_parser.lock();
202 if (sp)
203 {
204 try
205 {
206 sp->send_data(stream_id, data);
207 }
208 catch (const std::exception& e)
209 {
211 "Error streaming data on stream {}: {}", stream_id, e.what());
212 return false;
213 }
214 }
215 else
216 {
217 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
218 return false;
219 }
220
221 return true;
222 }
223
226 {
227 auto sp = server_parser.lock();
228 if (sp)
229 {
230 try
231 {
232 sp->set_on_stream_close_callback(stream_id, cb);
233 }
234 catch (const std::exception& e)
235 {
237 "Error setting close callback on stream {}: {}",
238 stream_id,
239 e.what());
240 return false;
241 }
242 }
243 else
244 {
245 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
246 return false;
247 }
248 return true;
249 }
250 };
251
255 {
256 private:
257 std::shared_ptr<http2::ServerParser> server_parser;
258
259 std::shared_ptr<ccf::RPCMap> rpc_map;
260 std::shared_ptr<ccf::RpcHandler> handler;
261 ccf::ListenInterfaceID interface_id;
262
263 http::ResponderLookup& responder_lookup;
264
265 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
266 session_ctxs;
267
268 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
269 http2::StreamId stream_id)
270 {
271 auto it = session_ctxs.find(stream_id);
272 if (it == session_ctxs.end())
273 {
274 it = session_ctxs.emplace_hint(
275 it,
276 std::make_pair(
277 stream_id,
278 std::make_shared<HTTP2SessionContext>(
279 session_id, tls_io->peer_cert(), interface_id, stream_id)));
280 }
281
282 return it->second;
283 }
284
285 std::shared_ptr<HTTPResponder> get_stream_responder(
286 http2::StreamId stream_id)
287 {
288 auto responder = responder_lookup.lookup_responder(session_id, stream_id);
289 if (responder == nullptr)
290 {
291 responder =
292 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
293 responder_lookup.add_responder(session_id, stream_id, responder);
294 }
295
296 return responder;
297 }
298
299 void respond_with_error(
300 http2::StreamId stream_id, const ccf::ErrorDetails& error)
301 {
302 nlohmann::json body = ccf::ODataErrorResponse{
303 ccf::ODataError{std::move(error.code), std::move(error.msg)}};
304 const auto s = body.dump();
305
306 ccf::http::HeaderMap headers;
307 headers[ccf::http::headers::CONTENT_TYPE] =
308 ccf::http::headervalues::contenttype::JSON;
309
310 get_stream_responder(stream_id)->send_response(
311 error.status,
312 std::move(headers),
313 {},
314 {(const uint8_t*)s.data(), s.size()});
315 }
316
317 public:
319 std::shared_ptr<ccf::RPCMap> rpc_map,
320 int64_t session_id_,
321 const ccf::ListenInterfaceID& interface_id,
322 ringbuffer::AbstractWriterFactory& writer_factory,
323 std::unique_ptr<ccf::tls::Context> ctx,
324 const ccf::http::ParserConfiguration& configuration,
325 const std::shared_ptr<ErrorReporter>& error_reporter,
326 http::ResponderLookup& responder_lookup_) :
327 HTTP2Session(session_id_, writer_factory, std::move(ctx), error_reporter),
328 server_parser(
329 std::make_shared<http2::ServerParser>(*this, configuration)),
330 rpc_map(rpc_map),
331 interface_id(interface_id),
332 responder_lookup(responder_lookup_)
333 {
334 server_parser->set_outgoing_data_handler(
335 [this](std::span<const uint8_t> data) {
336 this->tls_io->send_raw(data.data(), data.size());
337 });
338 }
339
341 {
342 responder_lookup.cleanup_responders(session_id);
343 }
344
345 bool parse(std::span<const uint8_t> data) override
346 {
347 try
348 {
349 if (!server_parser->execute(data.data(), data.size()))
350 {
351 // Close session gracefully
352 tls_io->close();
353 return false;
354 }
355 return true;
356 }
358 {
359 if (error_reporter)
360 {
361 error_reporter->report_request_payload_too_large_error(interface_id);
362 }
363
364 LOG_DEBUG_FMT("Request is too large: {}", e.what());
365
367 HTTP_STATUS_PAYLOAD_TOO_LARGE,
368 ccf::errors::RequestBodyTooLarge,
369 e.what()};
370
371 respond_with_error(e.get_stream_id(), error);
372
373 tls_io->close();
374 }
376 {
377 if (error_reporter)
378 {
379 error_reporter->report_request_header_too_large_error(interface_id);
380 }
381
382 LOG_DEBUG_FMT("Request header is too large: {}", e.what());
383
385 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
386 ccf::errors::RequestHeaderTooLarge,
387 e.what()};
388
389 respond_with_error(e.get_stream_id(), error);
390
391 tls_io->close();
392 }
393 catch (const std::exception& e)
394 {
395 if (error_reporter)
396 {
397 error_reporter->report_parsing_error(interface_id);
398 }
399
400 LOG_DEBUG_FMT("Error parsing HTTP request: {}", e.what());
401
402 // For generic parsing errors, as it is not trivial to construct a valid
403 // HTTP/2 response to send back to the default stream (0), the session
404 // is simply closed.
405
406 tls_io->close();
407 }
408 return false;
409 }
410
412 llhttp_method verb,
413 const std::string_view& url,
414 ccf::http::HeaderMap&& headers,
415 std::vector<uint8_t>&& body,
416 int32_t stream_id) override
417 {
419 "Processing msg({}, {} [{} bytes])",
420 llhttp_method_name(verb),
421 url,
422 body.size());
423
424 auto responder = get_stream_responder(stream_id);
425 auto session_ctx = get_session_ctx(stream_id);
426
427 try
428 {
429 std::shared_ptr<http::HttpRpcContext> rpc_ctx = nullptr;
430 try
431 {
432 rpc_ctx = std::make_shared<HttpRpcContext>(
433 session_ctx,
435 verb,
436 url,
437 std::move(headers),
438 std::move(body),
439 responder);
440 }
441 catch (std::exception& e)
442 {
444 HTTP_STATUS_INTERNAL_SERVER_ERROR,
445 ccf::errors::InternalError,
446 fmt::format("Error constructing RpcContext: {}", e.what())});
447 }
448 std::shared_ptr<ccf::RpcHandler> search =
449 http::fetch_rpc_handler(rpc_ctx, rpc_map);
450
451 search->process(rpc_ctx);
452
453 if (rpc_ctx->response_is_pending)
454 {
455 // If the RPC is pending, hold the connection.
456 LOG_TRACE_FMT("Pending");
457 return;
458 }
459 else
460 {
461 responder->send_response(
462 rpc_ctx->get_response_http_status(),
463 rpc_ctx->get_response_headers(),
464 rpc_ctx->get_response_trailers(),
465 std::move(rpc_ctx->get_response_body()));
466 }
467 }
468 catch (const std::exception& e)
469 {
470 responder->send_odata_error_response(ccf::ErrorDetails{
471 HTTP_STATUS_INTERNAL_SERVER_ERROR,
472 ccf::errors::InternalError,
473 fmt::format("Exception: {}", e.what())});
474
475 // On any exception, close the connection.
476 LOG_FAIL_FMT("Closing connection");
477 LOG_DEBUG_FMT("Closing connection due to exception: {}", e.what());
478 tls_io->close();
479 throw;
480 }
481 }
482
484 http_status status_code,
485 ccf::http::HeaderMap&& headers,
486 ccf::http::HeaderMap&& trailers,
487 std::span<const uint8_t> body) override
488 {
489 return get_stream_responder(http2::DEFAULT_STREAM_ID)
490 ->send_response(
491 status_code, std::move(headers), std::move(trailers), body);
492 }
493
495 http_status status, const ccf::http::HeaderMap& headers) override
496 {
497 return get_stream_responder(http2::DEFAULT_STREAM_ID)
498 ->start_stream(status, headers);
499 }
500
501 bool stream_data(std::span<const uint8_t> data) override
502 {
503 return get_stream_responder(http2::DEFAULT_STREAM_ID)->stream_data(data);
504 }
505
506 bool close_stream(ccf::http::HeaderMap&& trailers) override
507 {
508 return get_stream_responder(http2::DEFAULT_STREAM_ID)
509 ->close_stream(std::move(trailers));
510 }
511
514 {
515 return get_stream_responder(http2::DEFAULT_STREAM_ID)
516 ->set_on_stream_close_callback(cb);
517 }
518 };
519
521 public ccf::ClientSession,
523 {
524 private:
525 http2::ClientParser client_parser;
526
527 public:
529 int64_t session_id_,
530 ringbuffer::AbstractWriterFactory& writer_factory,
531 std::unique_ptr<ccf::tls::Context> ctx) :
532 HTTP2Session(session_id_, writer_factory, std::move(ctx)),
533 ccf::ClientSession(session_id_, writer_factory),
534 client_parser(*this)
535 {
536 client_parser.set_outgoing_data_handler(
537 [this](std::span<const uint8_t> data) {
538 this->tls_io->send_raw(data.data(), data.size());
539 });
540 }
541
542 bool parse(std::span<const uint8_t> data) override
543 {
544 // Catch response parsing errors and log them
545 try
546 {
547 client_parser.execute(data.data(), data.size());
548
549 return true;
550 }
551 catch (const std::exception& e)
552 {
553 LOG_FAIL_FMT("Error parsing HTTP2 response on session {}", session_id);
554 LOG_DEBUG_FMT("Error parsing HTTP2 response: {}", e.what());
556 "Error occurred while parsing fragment {} byte fragment:\n{}",
557 data.size(),
558 std::string_view((char const*)data.data(), data.size()));
559
560 tls_io->close();
561 }
562 return false;
563 }
564
565 void send_request(http::Request&& request) override
566 {
567 client_parser.send_structured_request(
568 request.get_method(),
569 request.get_path(),
570 request.get_headers(),
571 {request.get_content_data(), request.get_content_length()});
572 }
573
575 http_status status,
576 ccf::http::HeaderMap&& headers,
577 std::vector<uint8_t>&& body) override
578 {
579 handle_data_cb(status, std::move(headers), std::move(body));
580
581 LOG_TRACE_FMT("Closing connection, message handled");
582 tls_io->close();
583 }
584 };
585}
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:22
Definition tls_session.h:29
Definition session.h:15
Definition http_responder.h:16
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:35
Definition http2_parser.h:500
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::span< const uint8_t > body)
Definition http2_parser.h:510
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:523
void send_request(http::Request &&request) override
Definition http2_session.h:565
void handle_response(http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:574
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:528
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:542
Definition http2_session.h:255
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:512
~HTTP2ServerSession()
Definition http2_session.h:340
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:345
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map, int64_t session_id_, const ccf::ListenInterfaceID &interface_id, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter, http::ResponderLookup &responder_lookup_)
Definition http2_session.h:318
bool send_response(http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:483
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:411
bool start_stream(http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:494
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:501
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:506
Definition http2_session.h:17
std::shared_ptr< ErrorReporter > error_reporter
Definition http2_session.h:20
virtual bool parse(std::span< const uint8_t > data)=0
::tcp::ConnID session_id
Definition http2_session.h:21
HTTP2Session(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const std::shared_ptr< ErrorReporter > &error_reporter=nullptr)
Definition http2_session.h:23
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition http2_session.h:55
std::shared_ptr< ccf::TLSSession > tls_io
Definition http2_session.h:19
void send_data_thread(std::vector< uint8_t > &&data) override
Definition http2_session.h:45
void close_session() override
Definition http2_session.h:50
void send_data(std::span< const uint8_t > data) override
Definition http2_session.h:38
Definition http2_session.h:109
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:224
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:176
bool send_response(http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:126
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:119
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:199
bool start_stream(http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:152
Definition http_exceptions.h:40
Definition http_exceptions.h:31
Definition http_proc.h:20
http2::StreamId get_stream_id() const
Definition http_exceptions.h:24
Definition http_builder.h:106
Definition responder_lookup.h:14
void cleanup_responders(::tcp::ConnID session_id)
Definition responder_lookup.h:52
std::shared_ptr< ccf::http::HTTPResponder > lookup_responder(::tcp::ConnID session_id, http2::StreamId stream_id)
Definition responder_lookup.h:25
void add_responder(::tcp::ConnID session_id, http2::StreamId stream_id, std::shared_ptr< ccf::http::HTTPResponder > responder)
Definition responder_lookup.h:43
Definition http_proc.h:31
Definition ring_buffer_types.h:153
llhttp_status http_status
Definition http_status.h:7
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13
Definition app_interface.h:15
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:21
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
STL namespace.
int64_t ConnID
Definition msg_types.h:9
Definition odata_error.h:56
Definition odata_error.h:48
Definition odata_error.h:37
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:95
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:98
int32_t stream_id
Definition http2_session.h:96