CCF
Loading...
Searching...
No Matches
http2_parser.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
7#include "enclave/session.h"
8#include "http2_callbacks.h"
9#include "http2_types.h"
10#include "http_proc.h"
11#include "http_rpc_context.h"
12
13namespace http2
14{
15 using DataHandlerCB = std::function<void(std::span<const uint8_t>)>;
16
17 class Parser : public AbstractParser
18 {
19 private:
20 // Keep track of last peer stream id received on this session so that we can
21 // reject new streams ids less than this value.
22 StreamId last_stream_id = 0;
23 DataHandlerCB handle_outgoing_data;
25
26 protected:
27 std::map<StreamId, std::shared_ptr<StreamData>> streams;
28 nghttp2_session* session = nullptr;
29
30 public:
32 ccf::http::ParserConfiguration configuration_, bool is_client = false) :
33 configuration(std::move(configuration_))
34 {
35 LOG_TRACE_FMT("Creating HTTP2 parser");
36
37 nghttp2_session_callbacks* callbacks = nullptr;
38 nghttp2_session_callbacks_new(&callbacks);
39 nghttp2_session_callbacks_set_on_stream_close_callback(
40 callbacks, on_stream_close_callback);
41 nghttp2_session_callbacks_set_data_source_read_length_callback(
42 callbacks, on_data_source_read_length_callback);
43 nghttp2_session_callbacks_set_error_callback2(
44 callbacks, on_error_callback);
45
46 nghttp2_session_callbacks_set_on_begin_frame_callback(
47 callbacks, on_begin_frame_recv_callback);
48 nghttp2_session_callbacks_set_on_frame_recv_callback(
49 callbacks, on_frame_recv_callback);
50 nghttp2_session_callbacks_set_on_begin_headers_callback(
51 callbacks, on_begin_headers_callback);
52 nghttp2_session_callbacks_set_on_header_callback(
53 callbacks, on_header_callback);
54 nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
55 callbacks, on_data_callback);
56
57 if (is_client)
58 {
59 if (nghttp2_session_client_new(&session, callbacks, this) != 0)
60 {
61 throw std::logic_error("Could not create new HTTP/2 client session");
62 }
63 }
64 else
65 {
66 if (nghttp2_session_server_new(&session, callbacks, this) != 0)
67 {
68 throw std::logic_error("Could not create new HTTP/2 server session");
69 }
70 }
71
72 // Submit initial settings
73 std::vector<nghttp2_settings_entry> settings;
74 settings.push_back(
75 {NGHTTP2_SETTINGS_MAX_FRAME_SIZE,
76 static_cast<uint32_t>(configuration.max_frame_size.value_or(
77 ccf::http::default_max_frame_size))});
78 settings.push_back(
79 {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
80 static_cast<uint32_t>(
81 configuration.max_concurrent_streams_count.value_or(
82 ccf::http::default_max_concurrent_streams_count))});
83 settings.push_back(
84 {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
85 static_cast<uint32_t>(configuration.initial_window_size.value_or(
86 ccf::http::default_initial_window_size))});
87 // NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE is only a hint to client
88 // (https://www.rfc-editor.org/rfc/rfc7540#section-10.5.1)
89 settings.push_back(
90 {NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
91 static_cast<uint32_t>(
92 configuration.max_headers_count.value_or(
93 ccf::http::default_max_headers_count) *
94 configuration.max_header_size.value_or(
95 ccf::http::default_max_header_size))});
96
97 auto rv = nghttp2_submit_settings(
98 session, NGHTTP2_FLAG_NONE, settings.data(), settings.size());
99 if (rv != 0)
100 {
101 throw std::logic_error(fmt::format(
102 "Error submitting settings for HTTP2 session: {}",
103 nghttp2_strerror(rv)));
104 }
105
106 nghttp2_session_callbacks_del(callbacks);
107 }
108
109 ~Parser() override
110 {
111 nghttp2_session_del(session);
112 }
113
114 [[nodiscard]] StreamId get_last_stream_id() const override
115 {
116 return last_stream_id;
117 }
118
120 const override
121 {
122 return configuration;
123 }
124
126 {
127 handle_outgoing_data = std::move(cb);
128 }
129
131 StreamId stream_id, const std::shared_ptr<StreamData>& stream_data)
132 {
133 auto it = streams.find(stream_id);
134 if (it != streams.end())
135 {
136 throw std::logic_error(fmt::format(
137 "Cannot store new stream {} as it already exists", stream_id));
138 }
139
140 streams.insert(it, {stream_id, stream_data});
141 last_stream_id = stream_id;
142 }
143
144 std::shared_ptr<StreamData> get_stream(StreamId stream_id) override
145 {
146 auto it = streams.find(stream_id);
147 if (it == streams.end())
148 {
149 return nullptr;
150 }
151 return it->second;
152 }
153
154 std::shared_ptr<StreamData> create_stream(StreamId stream_id) override
155 {
156 auto s = std::make_shared<StreamData>();
157 store_stream(stream_id, s);
158 LOG_TRACE_FMT("Created new stream {}", stream_id);
159 return s;
160 }
161
162 void destroy_stream(StreamId stream_id) override
163 {
164 auto it = streams.find(stream_id);
165 if (it != streams.end())
166 {
167 // Erase _before_ calling close callback as `destroy_stream()` may be
168 // called multiple times (once when the client closes the stream, and
169 // when the server sends the final trailers)
170 auto stream_data = it->second;
171 it = streams.erase(it);
172 LOG_TRACE_FMT("Deleted stream {}", stream_id);
173 if (
174 stream_data->close_callback != nullptr &&
175 stream_data->outgoing.state != http2::StreamResponseState::Closing)
176 {
177 // Close callback is supplied by app so handle eventual exceptions
178 try
179 {
180 stream_data->close_callback();
181 }
182 catch (const std::exception& e)
183 {
184 LOG_DEBUG_FMT("Error closing callback: {}", e.what());
185 }
186 }
187 LOG_TRACE_FMT("Successfully destroyed stream {}", stream_id);
188 }
189 else
190 {
191 LOG_DEBUG_FMT("Cannot destroy unknown stream {}", stream_id);
192 }
193 }
194
195 bool execute(const uint8_t* data, size_t size)
196 {
197 LOG_TRACE_FMT("http2::Parser execute: {}", size);
198 auto readlen = nghttp2_session_mem_recv(session, data, size);
199 if (readlen < 0)
200 {
201 throw std::logic_error(fmt::format(
202 "HTTP/2: Error receiving data: {}", nghttp2_strerror(readlen)));
203 }
204
206
207 // Detects whether server session expects any more data to read/write, and
208 // if not (e.g. goaway frame was handled), closes session gracefully
209 if (
210 nghttp2_session_want_read(session) == 0 &&
211 nghttp2_session_want_write(session) == 0)
212 {
213 LOG_TRACE_FMT("http2::Parser execute: Closing session gracefully");
214 return false;
215 }
216
217 return true;
218 }
219
221 {
222 ssize_t size = 0;
223 const uint8_t* data = nullptr;
224 while ((size = nghttp2_session_mem_send(session, &data)) != 0)
225 {
226 if (size > 0)
227 {
228 handle_outgoing_data({data, static_cast<size_t>(size)});
229 }
230 else
231 {
232 throw std::logic_error(fmt::format(
233 "HTTP/2: Error sending data: {}", nghttp2_strerror(size)));
234 }
235 }
236 }
237 };
238
239 class ServerParser : public Parser
240 {
241 private:
243
244 void submit_trailers(StreamId stream_id, ccf::http::HeaderMap&& trailers)
245 {
246 if (trailers.empty())
247 {
248 return;
249 }
250
251 LOG_TRACE_FMT("Submitting {} trailers", trailers.size());
252 std::vector<nghttp2_nv> trlrs;
253 trlrs.reserve(trailers.size());
254 for (auto& [k, v] : trailers)
255 {
256 trlrs.emplace_back(make_nv(k.data(), v.data()));
257 }
258
259 int rv =
260 nghttp2_submit_trailer(session, stream_id, trlrs.data(), trlrs.size());
261 if (rv != 0)
262 {
263 throw std::logic_error(fmt::format(
264 "nghttp2_submit_trailer error: {}", nghttp2_strerror(rv)));
265 }
266 }
267
268 void submit_response(
269 StreamId stream_id,
270 ccf::http_status status,
271 ccf::http::HeaderMap&& base_headers,
272 const ccf::http::HeaderMap& extra_headers = {})
273 {
274 std::vector<nghttp2_nv> hdrs = {};
275
276 auto status_str = fmt::format(
277 "{}", static_cast<std::underlying_type_t<ccf::http_status>>(status));
278 hdrs.emplace_back(
279 make_nv(ccf::http2::headers::STATUS, status_str.data()));
280
281 for (const auto& [k, v] : base_headers)
282 {
283 hdrs.emplace_back(make_nv(k.data(), v.data()));
284 }
285
286 for (const auto& [k, v] : extra_headers)
287 {
288 hdrs.emplace_back(make_nv(k.data(), v.data()));
289 }
290
291 nghttp2_data_provider prov;
292 prov.read_callback = read_outgoing_callback;
293
294 int rv = nghttp2_submit_response(
295 session, stream_id, hdrs.data(), hdrs.size(), &prov);
296 if (rv != 0)
297 {
298 throw std::logic_error(fmt::format(
299 "nghttp2_submit_response error: {}", nghttp2_strerror(rv)));
300 }
301 }
302
303 public:
306 const ccf::http::ParserConfiguration& configuration_) :
307 Parser(configuration_, false),
308 proc(proc_)
309 {}
310
312 {
314 "http2::set_on_stream_close_callback: stream {}", stream_id);
315
316 auto* stream_data = get_stream_data(session, stream_id);
317 if (stream_data == nullptr)
318 {
319 throw std::logic_error(
320 fmt::format("Stream {} no longer exists", stream_id));
321 }
322
323 stream_data->close_callback = cb;
324 }
325
327 StreamId stream_id,
328 ccf::http_status status,
329 ccf::http::HeaderMap&& headers,
330 ccf::http::HeaderMap&& trailers,
331 std::vector<uint8_t>&& body)
332 {
334 "http2::respond: stream {} - {} headers - {} trailers - {} bytes "
335 "body",
336 stream_id,
337 headers.size(),
338 trailers.size(),
339 body.size());
340
341 auto* stream_data = get_stream_data(session, stream_id);
342 if (stream_data == nullptr)
343 {
344 throw std::logic_error(
345 fmt::format("Stream {} no longer exists", stream_id));
346 }
347
348 bool should_submit_response =
349 stream_data->outgoing.state != StreamResponseState::Streaming;
350
351 stream_data->outgoing.state = StreamResponseState::Closing;
352
353 ccf::http::HeaderMap extra_headers = {};
354 extra_headers[ccf::http::headers::CONTENT_LENGTH] =
355 std::to_string(body.size());
356 auto thv = make_trailer_header_value(trailers);
357 if (thv.has_value())
358 {
359 extra_headers[ccf::http::headers::TRAILER] = thv.value();
360 }
361
362 stream_data->outgoing.body = DataSource(std::move(body));
363 stream_data->outgoing.has_trailers = !trailers.empty();
364
365 if (should_submit_response)
366 {
367 submit_response(stream_id, status, std::move(headers), extra_headers);
369 }
370
371 submit_trailers(stream_id, std::move(trailers));
373 }
374
376 StreamId stream_id,
377 ccf::http_status status,
378 ccf::http::HeaderMap&& headers)
379 {
381 "http2::start_stream: stream {} - {} headers",
382 stream_id,
383 headers.size());
384
385 auto* stream_data = get_stream_data(session, stream_id);
386 if (stream_data == nullptr)
387 {
388 throw std::logic_error(
389 fmt::format("Stream {} no longer exists", stream_id));
390 }
391
392 if (stream_data->outgoing.state != StreamResponseState::Uninitialised)
393 {
394 throw std::logic_error(fmt::format(
395 "Stream {} should be uninitialised to start stream", stream_id));
396 }
397
398 stream_data->outgoing.state = StreamResponseState::Streaming;
399
400 submit_response(stream_id, status, std::move(headers));
402 }
403
404 void send_data(StreamId stream_id, std::vector<uint8_t>&& data)
405 {
407 "http2::send_data: stream {} - {} bytes", stream_id, data.size());
408
409 auto* stream_data = get_stream_data(session, stream_id);
410 if (stream_data == nullptr)
411 {
412 throw std::logic_error(
413 fmt::format("Stream {} no longer exists", stream_id));
414 }
415
416 if (stream_data->outgoing.state != StreamResponseState::Streaming)
417 {
418 throw std::logic_error(
419 fmt::format("Stream {} should be streaming to send data", stream_id));
420 }
421
422 stream_data->outgoing.body = DataSource(std::move(data));
423
424 int rv = nghttp2_session_resume_data(session, stream_id);
425 if (rv < 0)
426 {
427 throw std::logic_error(fmt::format(
428 "nghttp2_session_resume_data error: {}", nghttp2_strerror(rv)));
429 }
430
432 }
433
434 void close_stream(StreamId stream_id, ccf::http::HeaderMap&& trailers)
435 {
437 "http2::close: stream {} - {} trailers ", stream_id, trailers.size());
438
439 auto* stream_data = get_stream_data(session, stream_id);
440 if (stream_data == nullptr)
441 {
442 throw std::logic_error(
443 fmt::format("Stream {} no longer exists", stream_id));
444 }
445
446 auto it = streams.find(stream_id);
447 if (it != streams.end())
448 {
449 stream_data->outgoing.state = StreamResponseState::Closing;
450 stream_data->outgoing.has_trailers = !trailers.empty();
451
452 submit_trailers(stream_id, std::move(trailers));
454 }
455 // else this stream was already closed by client, and we shouldn't send
456 // trailers
457 }
458
459 void handle_completed(StreamId stream_id, StreamData* stream_data) override
460 {
461 LOG_TRACE_FMT("http2::ServerParser: handle_completed");
462
463 if (stream_data == nullptr)
464 {
465 LOG_FAIL_FMT("No stream data to handle request");
466 return;
467 }
468
469 auto& headers = stream_data->incoming.headers;
470
471 std::string url = {};
472 {
473 const auto url_it = headers.find(ccf::http2::headers::PATH);
474 if (url_it != headers.end())
475 {
476 url = url_it->second;
477 }
478 }
479
480 llhttp_method method = {};
481 {
482 const auto method_it = headers.find(ccf::http2::headers::METHOD);
483 if (method_it != headers.end())
484 {
485 method = ccf::http_method_from_str(method_it->second);
486 }
487 }
488
489 proc.handle_request(
490 method,
491 url,
492 std::move(stream_data->incoming.headers),
493 std::move(stream_data->incoming.body),
494 stream_id);
495 }
496 };
497
498 class ClientParser : public Parser
499 {
500 private:
502
503 public:
505 Parser(ccf::http::ParserConfiguration{}, true),
506 proc(proc_)
507 {}
508
510 llhttp_method method,
511 const std::string& route,
512 const ccf::http::HeaderMap& headers,
513 std::vector<uint8_t>&& body)
514 {
515 std::vector<nghttp2_nv> hdrs;
516 hdrs.emplace_back(
517 make_nv(ccf::http2::headers::METHOD, llhttp_method_name(method)));
518 hdrs.emplace_back(make_nv(ccf::http2::headers::PATH, route.data()));
519 hdrs.emplace_back(make_nv(":scheme", "https"));
520 hdrs.emplace_back(make_nv(":authority", "localhost:8080"));
521 for (auto const& [k, v] : headers)
522 {
523 hdrs.emplace_back(make_nv(k.data(), v.data()));
524 }
525
526 auto stream_data = std::make_shared<StreamData>();
527 stream_data->outgoing.body = DataSource(std::move(body));
528
529 nghttp2_data_provider prov;
530 prov.read_callback = read_outgoing_callback;
531
532 stream_data->outgoing.state = StreamResponseState::Closing;
533
534 auto stream_id = nghttp2_submit_request(
535 session, nullptr, hdrs.data(), hdrs.size(), &prov, stream_data.get());
536 if (stream_id < 0)
537 {
539 "Could not submit HTTP request: {}", nghttp2_strerror(stream_id));
540 return;
541 }
542
543 store_stream(stream_id, stream_data);
544
546 LOG_DEBUG_FMT("Successfully sent request with stream id: {}", stream_id);
547 }
548
550 StreamId /*stream_id*/, StreamData* stream_data) override
551 {
552 LOG_TRACE_FMT("http2::ClientParser: handle_completed");
553
554 if (stream_data == nullptr)
555 {
556 LOG_FAIL_FMT("No stream data to handle response");
557 return;
558 }
559
560 auto& headers = stream_data->incoming.headers;
561
562 ccf::http_status status = {};
563 {
564 const auto status_it = headers.find(ccf::http2::headers::STATUS);
565 if (status_it != headers.end())
566 {
567 status = ccf::http_status(std::stoi(status_it->second));
568 }
569 }
570
571 proc.handle_response(
572 status,
573 std::move(stream_data->incoming.headers),
574 std::move(stream_data->incoming.body));
575 }
576 };
577}
Definition http2_types.h:67
Definition http2_parser.h:499
ClientParser(::http::ResponseProcessor &proc_)
Definition http2_parser.h:504
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::vector< uint8_t > &&body)
Definition http2_parser.h:509
void handle_completed(StreamId, StreamData *stream_data) override
Definition http2_parser.h:549
Definition http2_parser.h:18
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
std::map< StreamId, std::shared_ptr< StreamData > > streams
Definition http2_parser.h:27
nghttp2_session * session
Definition http2_parser.h:28
StreamId get_last_stream_id() const override
Definition http2_parser.h:114
ccf::http::ParserConfiguration get_configuration() const override
Definition http2_parser.h:119
~Parser() override
Definition http2_parser.h:109
void send_all_submitted()
Definition http2_parser.h:220
void store_stream(StreamId stream_id, const std::shared_ptr< StreamData > &stream_data)
Definition http2_parser.h:130
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
std::shared_ptr< StreamData > get_stream(StreamId stream_id) override
Definition http2_parser.h:144
std::shared_ptr< StreamData > create_stream(StreamId stream_id) override
Definition http2_parser.h:154
void destroy_stream(StreamId stream_id) override
Definition http2_parser.h:162
Parser(ccf::http::ParserConfiguration configuration_, bool is_client=false)
Definition http2_parser.h:31
Definition http2_parser.h:240
void start_stream(StreamId stream_id, ccf::http_status status, ccf::http::HeaderMap &&headers)
Definition http2_parser.h:375
void send_data(StreamId stream_id, std::vector< uint8_t > &&data)
Definition http2_parser.h:404
void set_on_stream_close_callback(StreamId stream_id, StreamCloseCB cb)
Definition http2_parser.h:311
ServerParser(http::RequestProcessor &proc_, const ccf::http::ParserConfiguration &configuration_)
Definition http2_parser.h:304
void handle_completed(StreamId stream_id, StreamData *stream_data) override
Definition http2_parser.h:459
void respond(StreamId stream_id, ccf::http_status status, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::vector< uint8_t > &&body)
Definition http2_parser.h:326
void close_stream(StreamId stream_id, ccf::http::HeaderMap &&trailers)
Definition http2_parser.h:434
Definition http_proc.h:20
virtual void handle_request(llhttp_method method, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id=http2::DEFAULT_STREAM_ID)=0
Definition http_proc.h:33
virtual void handle_response(ccf::http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body)=0
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
Definition app_interface.h:14
llhttp_status http_status
Definition http_status.h:9
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:20
std::function< void(void)> StreamCloseCB
Definition http2_types.h:23
std::function< void(std::span< const uint8_t >)> DataHandlerCB
Definition http2_parser.h:15
Definition error_reporter.h:6
STL namespace.
Definition http_configuration.h:24
std::optional< ccf::ds::SizeString > max_header_size
Definition http_configuration.h:26
std::optional< size_t > max_concurrent_streams_count
Definition http_configuration.h:30
std::optional< ccf::ds::SizeString > initial_window_size
Definition http_configuration.h:31
std::optional< uint32_t > max_headers_count
Definition http_configuration.h:27
std::optional< ccf::ds::SizeString > max_frame_size
Definition http_configuration.h:34
Definition http2_types.h:35
std::vector< uint8_t > body
Definition http2_types.h:51
ccf::http::HeaderMap headers
Definition http2_types.h:50
Definition http2_types.h:47
Incoming incoming
Definition http2_types.h:53