CCF
Loading...
Searching...
No Matches
http2_parser.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ccf/ds/nonstd.h"
7#include "enclave/session.h"
8#include "http2_callbacks.h"
9#include "http2_types.h"
10#include "http_proc.h"
11#include "http_rpc_context.h"
12
13namespace http2
14{
15 using DataHandlerCB = std::function<void(std::span<const uint8_t>)>;
16
17 class Parser : public AbstractParser
18 {
19 private:
20 // Keep track of last peer stream id received on this session so that we can
21 // reject new streams ids less than this value.
22 StreamId last_stream_id = 0;
23 DataHandlerCB handle_outgoing_data;
25
26 protected:
27 std::map<StreamId, std::shared_ptr<StreamData>> streams;
28 nghttp2_session* session;
29
30 public:
32 const ccf::http::ParserConfiguration& configuration_,
33 bool is_client = false) :
34 configuration(configuration_)
35 {
36 LOG_TRACE_FMT("Creating HTTP2 parser");
37
38 nghttp2_session_callbacks* callbacks;
39 nghttp2_session_callbacks_new(&callbacks);
40 nghttp2_session_callbacks_set_on_stream_close_callback(
41 callbacks, on_stream_close_callback);
42 nghttp2_session_callbacks_set_data_source_read_length_callback(
43 callbacks, on_data_source_read_length_callback);
44 nghttp2_session_callbacks_set_error_callback2(
45 callbacks, on_error_callback);
46
47 nghttp2_session_callbacks_set_on_begin_frame_callback(
48 callbacks, on_begin_frame_recv_callback);
49 nghttp2_session_callbacks_set_on_frame_recv_callback(
50 callbacks, on_frame_recv_callback);
51 nghttp2_session_callbacks_set_on_begin_headers_callback(
52 callbacks, on_begin_headers_callback);
53 nghttp2_session_callbacks_set_on_header_callback(
54 callbacks, on_header_callback);
55 nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
56 callbacks, on_data_callback);
57
58 if (is_client)
59 {
60 if (nghttp2_session_client_new(&session, callbacks, this) != 0)
61 {
62 throw std::logic_error("Could not create new HTTP/2 client session");
63 }
64 }
65 else
66 {
67 if (nghttp2_session_server_new(&session, callbacks, this) != 0)
68 {
69 throw std::logic_error("Could not create new HTTP/2 server session");
70 }
71 }
72
73 // Submit initial settings
74 std::vector<nghttp2_settings_entry> settings;
75 settings.push_back(
76 {NGHTTP2_SETTINGS_MAX_FRAME_SIZE,
77 static_cast<uint32_t>(configuration.max_frame_size.value_or(
78 ccf::http::default_max_frame_size))});
79 settings.push_back(
80 {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
81 static_cast<uint32_t>(
82 configuration.max_concurrent_streams_count.value_or(
83 ccf::http::default_max_concurrent_streams_count))});
84 settings.push_back(
85 {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
86 static_cast<uint32_t>(configuration.initial_window_size.value_or(
87 ccf::http::default_initial_window_size))});
88 // NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE is only a hint to client
89 // (https://www.rfc-editor.org/rfc/rfc7540#section-10.5.1)
90 settings.push_back(
91 {NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
92 static_cast<uint32_t>(
93 configuration.max_headers_count.value_or(
94 ccf::http::default_max_headers_count) *
95 configuration.max_header_size.value_or(
96 ccf::http::default_max_header_size))});
97
98 auto rv = nghttp2_submit_settings(
99 session, NGHTTP2_FLAG_NONE, settings.data(), settings.size());
100 if (rv != 0)
101 {
102 throw std::logic_error(fmt::format(
103 "Error submitting settings for HTTP2 session: {}",
104 nghttp2_strerror(rv)));
105 }
106
107 nghttp2_session_callbacks_del(callbacks);
108 }
109
110 virtual ~Parser()
111 {
112 nghttp2_session_del(session);
113 }
114
116 {
117 return last_stream_id;
118 }
119
121 {
122 return configuration;
123 }
124
126 {
127 handle_outgoing_data = std::move(cb);
128 }
129
131 StreamId stream_id, const std::shared_ptr<StreamData>& stream_data)
132 {
133 auto it = streams.find(stream_id);
134 if (it != streams.end())
135 {
136 throw std::logic_error(fmt::format(
137 "Cannot store new stream {} as it already exists", stream_id));
138 }
139
140 streams.insert(it, {stream_id, stream_data});
141 last_stream_id = stream_id;
142 }
143
144 std::shared_ptr<StreamData> get_stream(StreamId stream_id) override
145 {
146 auto it = streams.find(stream_id);
147 if (it == streams.end())
148 {
149 return nullptr;
150 }
151 return it->second;
152 }
153
154 std::shared_ptr<StreamData> create_stream(StreamId stream_id) override
155 {
156 auto s = std::make_shared<StreamData>();
157 store_stream(stream_id, s);
158 LOG_TRACE_FMT("Created new stream {}", stream_id);
159 return s;
160 }
161
162 void destroy_stream(StreamId stream_id) override
163 {
164 auto it = streams.find(stream_id);
165 if (it != streams.end())
166 {
167 // Erase _before_ calling close callback as `destroy_stream()` may be
168 // called multiple times (once when the client closes the stream, and
169 // when the server sends the final trailers)
170 auto stream_data = it->second;
171 it = streams.erase(it);
172 LOG_TRACE_FMT("Deleted stream {}", stream_id);
173 if (
174 stream_data->close_callback != nullptr &&
175 stream_data->outgoing.state != http2::StreamResponseState::Closing)
176 {
177 // Close callback is supplied by app so handle eventual exceptions
178 try
179 {
180 stream_data->close_callback();
181 }
182 catch (const std::exception& e)
183 {
184 LOG_DEBUG_FMT("Error closing callback: {}", e.what());
185 }
186 }
187 LOG_TRACE_FMT("Successfully destroyed stream {}", stream_id);
188 }
189 else
190 {
191 LOG_DEBUG_FMT("Cannot destroy unknown stream {}", stream_id);
192 }
193 }
194
195 bool execute(const uint8_t* data, size_t size)
196 {
197 LOG_TRACE_FMT("http2::Parser execute: {}", size);
198 auto readlen = nghttp2_session_mem_recv(session, data, size);
199 if (readlen < 0)
200 {
201 throw std::logic_error(fmt::format(
202 "HTTP/2: Error receiving data: {}", nghttp2_strerror(readlen)));
203 }
204
206
207 // Detects whether server session expects any more data to read/write, and
208 // if not (e.g. goaway frame was handled), closes session gracefully
209 if (
210 nghttp2_session_want_read(session) == 0 &&
211 nghttp2_session_want_write(session) == 0)
212 {
213 LOG_TRACE_FMT("http2::Parser execute: Closing session gracefully");
214 return false;
215 }
216
217 return true;
218 }
219
221 {
222 ssize_t size = 0;
223 const uint8_t* data = nullptr;
224 while ((size = nghttp2_session_mem_send(session, &data)) != 0)
225 {
226 if (size > 0)
227 {
228 handle_outgoing_data({data, static_cast<size_t>(size)});
229 }
230 else
231 {
232 throw std::logic_error(fmt::format(
233 "HTTP/2: Error sending data: {}", nghttp2_strerror(size)));
234 }
235 }
236 }
237 };
238
239 class ServerParser : public Parser
240 {
241 private:
243
244 void submit_trailers(StreamId stream_id, ccf::http::HeaderMap&& trailers)
245 {
246 if (trailers.empty())
247 {
248 return;
249 }
250
251 LOG_TRACE_FMT("Submitting {} trailers", trailers.size());
252 std::vector<nghttp2_nv> trlrs;
253 trlrs.reserve(trailers.size());
254 for (auto& [k, v] : trailers)
255 {
256 trlrs.emplace_back(make_nv(k.data(), v.data()));
257 }
258
259 int rv =
260 nghttp2_submit_trailer(session, stream_id, trlrs.data(), trlrs.size());
261 if (rv != 0)
262 {
263 throw std::logic_error(fmt::format(
264 "nghttp2_submit_trailer error: {}", nghttp2_strerror(rv)));
265 }
266 }
267
268 void submit_response(
269 StreamId stream_id,
270 http_status status,
271 const ccf::http::HeaderMap& base_headers,
272 const ccf::http::HeaderMap& extra_headers = {})
273 {
274 std::vector<nghttp2_nv> hdrs = {};
275
276 auto status_str = fmt::format(
277 "{}", static_cast<std::underlying_type<http_status>::type>(status));
278 hdrs.emplace_back(
279 make_nv(ccf::http2::headers::STATUS, status_str.data()));
280
281 for (auto& [k, v] : base_headers)
282 {
283 hdrs.emplace_back(make_nv(k.data(), v.data()));
284 }
285
286 for (auto& [k, v] : extra_headers)
287 {
288 hdrs.emplace_back(make_nv(k.data(), v.data()));
289 }
290
291 nghttp2_data_provider prov;
292 prov.read_callback = read_outgoing_callback;
293
294 int rv = nghttp2_submit_response(
295 session, stream_id, hdrs.data(), hdrs.size(), &prov);
296 if (rv != 0)
297 {
298 throw std::logic_error(fmt::format(
299 "nghttp2_submit_response error: {}", nghttp2_strerror(rv)));
300 }
301 }
302
303 public:
306 const ccf::http::ParserConfiguration& configuration_) :
307 Parser(configuration_, false),
308 proc(proc_)
309 {}
310
312 {
314 "http2::set_on_stream_close_callback: stream {}", stream_id);
315
316 auto* stream_data = get_stream_data(session, stream_id);
317 if (stream_data == nullptr)
318 {
319 throw std::logic_error(
320 fmt::format("Stream {} no longer exists", stream_id));
321 }
322
323 stream_data->close_callback = cb;
324 }
325
327 StreamId stream_id,
328 http_status status,
329 const ccf::http::HeaderMap& headers,
330 ccf::http::HeaderMap&& trailers,
331 std::span<const uint8_t> body)
332 {
334 "http2::respond: stream {} - {} headers - {} trailers - {} bytes "
335 "body",
336 stream_id,
337 headers.size(),
338 trailers.size(),
339 body.size());
340
341 auto* stream_data = get_stream_data(session, stream_id);
342 if (stream_data == nullptr)
343 {
344 throw std::logic_error(
345 fmt::format("Stream {} no longer exists", stream_id));
346 }
347
348 bool should_submit_response =
349 stream_data->outgoing.state != StreamResponseState::Streaming;
350
351 stream_data->outgoing.state = StreamResponseState::Closing;
352
353 ccf::http::HeaderMap extra_headers = {};
354 extra_headers[ccf::http::headers::CONTENT_LENGTH] =
355 std::to_string(body.size());
356 auto thv = make_trailer_header_value(trailers);
357 if (thv.has_value())
358 {
359 extra_headers[ccf::http::headers::TRAILER] = thv.value();
360 }
361
362 stream_data->outgoing.body = DataSource(body);
363 stream_data->outgoing.has_trailers = !trailers.empty();
364
365 if (should_submit_response)
366 {
367 submit_response(stream_id, status, headers, extra_headers);
369 }
370
371 submit_trailers(stream_id, std::move(trailers));
373 }
374
376 StreamId stream_id,
377 http_status status,
378 const ccf::http::HeaderMap& headers)
379 {
381 "http2::start_stream: stream {} - {} headers",
382 stream_id,
383 headers.size());
384
385 auto* stream_data = get_stream_data(session, stream_id);
386 if (stream_data == nullptr)
387 {
388 throw std::logic_error(
389 fmt::format("Stream {} no longer exists", stream_id));
390 }
391
392 if (stream_data->outgoing.state != StreamResponseState::Uninitialised)
393 {
394 throw std::logic_error(fmt::format(
395 "Stream {} should be uninitialised to start stream", stream_id));
396 }
397
398 stream_data->outgoing.state = StreamResponseState::Streaming;
399
400 submit_response(stream_id, status, headers);
402 }
403
404 void send_data(StreamId stream_id, std::span<const uint8_t> data)
405 {
407 "http2::send_data: stream {} - {} bytes", stream_id, data.size());
408
409 auto* stream_data = get_stream_data(session, stream_id);
410 if (stream_data == nullptr)
411 {
412 throw std::logic_error(
413 fmt::format("Stream {} no longer exists", stream_id));
414 }
415
416 if (stream_data->outgoing.state != StreamResponseState::Streaming)
417 {
418 throw std::logic_error(
419 fmt::format("Stream {} should be streaming to send data", stream_id));
420 }
421
422 stream_data->outgoing.body = DataSource(data);
423
424 int rv = nghttp2_session_resume_data(session, stream_id);
425 if (rv < 0)
426 {
427 throw std::logic_error(fmt::format(
428 "nghttp2_session_resume_data error: {}", nghttp2_strerror(rv)));
429 }
430
432 }
433
434 void close_stream(StreamId stream_id, ccf::http::HeaderMap&& trailers)
435 {
437 "http2::close: stream {} - {} trailers ", stream_id, trailers.size());
438
439 auto* stream_data = get_stream_data(session, stream_id);
440 if (stream_data == nullptr)
441 {
442 throw std::logic_error(
443 fmt::format("Stream {} no longer exists", stream_id));
444 }
445
446 auto it = streams.find(stream_id);
447 if (it != streams.end())
448 {
449 stream_data->outgoing.state = StreamResponseState::Closing;
450 stream_data->outgoing.has_trailers = !trailers.empty();
451
452 submit_trailers(stream_id, std::move(trailers));
454 }
455 // else this stream was already closed by client, and we shouldn't send
456 // trailers
457 }
458
459 virtual void handle_completed(
460 StreamId stream_id, StreamData* stream_data) override
461 {
462 LOG_TRACE_FMT("http2::ServerParser: handle_completed");
463
464 if (stream_data == nullptr)
465 {
466 LOG_FAIL_FMT("No stream data to handle request");
467 return;
468 }
469
470 auto& headers = stream_data->incoming.headers;
471
472 std::string url = {};
473 {
474 const auto url_it = headers.find(ccf::http2::headers::PATH);
475 if (url_it != headers.end())
476 {
477 url = url_it->second;
478 }
479 }
480
481 llhttp_method method = {};
482 {
483 const auto method_it = headers.find(ccf::http2::headers::METHOD);
484 if (method_it != headers.end())
485 {
486 method = ccf::http_method_from_str(method_it->second.c_str());
487 }
488 }
489
490 proc.handle_request(
491 method,
492 url,
493 std::move(stream_data->incoming.headers),
494 std::move(stream_data->incoming.body),
495 stream_id);
496 }
497 };
498
499 class ClientParser : public Parser
500 {
501 private:
503
504 public:
506 Parser(ccf::http::ParserConfiguration{}, true),
507 proc(proc_)
508 {}
509
511 llhttp_method method,
512 const std::string& route,
513 const ccf::http::HeaderMap& headers,
514 std::span<const uint8_t> body)
515 {
516 std::vector<nghttp2_nv> hdrs;
517 hdrs.emplace_back(
518 make_nv(ccf::http2::headers::METHOD, llhttp_method_name(method)));
519 hdrs.emplace_back(make_nv(ccf::http2::headers::PATH, route.data()));
520 hdrs.emplace_back(make_nv(":scheme", "https"));
521 hdrs.emplace_back(make_nv(":authority", "localhost:8080"));
522 for (auto const& [k, v] : headers)
523 {
524 hdrs.emplace_back(make_nv(k.data(), v.data()));
525 }
526
527 auto stream_data = std::make_shared<StreamData>();
528 stream_data->outgoing.body = DataSource(body);
529
530 nghttp2_data_provider prov;
531 prov.read_callback = read_outgoing_callback;
532
533 stream_data->outgoing.state = StreamResponseState::Closing;
534
535 auto stream_id = nghttp2_submit_request(
536 session, nullptr, hdrs.data(), hdrs.size(), &prov, stream_data.get());
537 if (stream_id < 0)
538 {
540 "Could not submit HTTP request: {}", nghttp2_strerror(stream_id));
541 return;
542 }
543
544 store_stream(stream_id, stream_data);
545
547 LOG_DEBUG_FMT("Successfully sent request with stream id: {}", stream_id);
548 }
549
550 void handle_completed(StreamId stream_id, StreamData* stream_data) override
551 {
552 LOG_TRACE_FMT("http2::ClientParser: handle_completed");
553
554 if (stream_data == nullptr)
555 {
556 LOG_FAIL_FMT("No stream data to handle response");
557 return;
558 }
559
560 auto& headers = stream_data->incoming.headers;
561
562 http_status status = {};
563 {
564 const auto status_it = headers.find(ccf::http2::headers::STATUS);
565 if (status_it != headers.end())
566 {
567 status = http_status(std::stoi(status_it->second));
568 }
569 }
570
571 proc.handle_response(
572 status,
573 std::move(stream_data->incoming.headers),
574 std::move(stream_data->incoming.body));
575 }
576 };
577}
Definition http2_types.h:85
Definition http2_parser.h:500
ClientParser(::http::ResponseProcessor &proc_)
Definition http2_parser.h:505
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::span< const uint8_t > body)
Definition http2_parser.h:510
void handle_completed(StreamId stream_id, StreamData *stream_data) override
Definition http2_parser.h:550
Definition http2_types.h:36
Definition http2_parser.h:18
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
std::map< StreamId, std::shared_ptr< StreamData > > streams
Definition http2_parser.h:27
nghttp2_session * session
Definition http2_parser.h:28
StreamId get_last_stream_id() const override
Definition http2_parser.h:115
ccf::http::ParserConfiguration get_configuration() const override
Definition http2_parser.h:120
void send_all_submitted()
Definition http2_parser.h:220
void store_stream(StreamId stream_id, const std::shared_ptr< StreamData > &stream_data)
Definition http2_parser.h:130
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
std::shared_ptr< StreamData > get_stream(StreamId stream_id) override
Definition http2_parser.h:144
std::shared_ptr< StreamData > create_stream(StreamId stream_id) override
Definition http2_parser.h:154
virtual ~Parser()
Definition http2_parser.h:110
void destroy_stream(StreamId stream_id) override
Definition http2_parser.h:162
Parser(const ccf::http::ParserConfiguration &configuration_, bool is_client=false)
Definition http2_parser.h:31
Definition http2_parser.h:240
void start_stream(StreamId stream_id, http_status status, const ccf::http::HeaderMap &headers)
Definition http2_parser.h:375
void send_data(StreamId stream_id, std::span< const uint8_t > data)
Definition http2_parser.h:404
void set_on_stream_close_callback(StreamId stream_id, StreamCloseCB cb)
Definition http2_parser.h:311
void respond(StreamId stream_id, http_status status, const ccf::http::HeaderMap &headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body)
Definition http2_parser.h:326
virtual void handle_completed(StreamId stream_id, StreamData *stream_data) override
Definition http2_parser.h:459
ServerParser(http::RequestProcessor &proc_, const ccf::http::ParserConfiguration &configuration_)
Definition http2_parser.h:304
void close_stream(StreamId stream_id, ccf::http::HeaderMap &&trailers)
Definition http2_parser.h:434
Definition http_proc.h:20
virtual void handle_request(llhttp_method method, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id=http2::DEFAULT_STREAM_ID)=0
Definition http_proc.h:31
virtual void handle_response(http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body)=0
llhttp_status http_status
Definition http_status.h:7
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
Definition app_interface.h:15
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:21
ccf::http::StreamOnCloseCallback StreamCloseCB
Definition http2_types.h:24
std::function< void(std::span< const uint8_t >)> DataHandlerCB
Definition http2_parser.h:15
Definition error_reporter.h:6
Definition http_configuration.h:24
std::optional< ccf::ds::SizeString > max_header_size
Definition http_configuration.h:26
std::optional< size_t > max_concurrent_streams_count
Definition http_configuration.h:30
std::optional< ccf::ds::SizeString > initial_window_size
Definition http_configuration.h:31
std::optional< uint32_t > max_headers_count
Definition http_configuration.h:27
std::optional< ccf::ds::SizeString > max_frame_size
Definition http_configuration.h:34
std::vector< uint8_t > body
Definition http2_types.h:69
ccf::http::HeaderMap headers
Definition http2_types.h:68
Definition http2_types.h:65
Incoming incoming
Definition http2_types.h:71