CCF
Loading...
Searching...
No Matches
http2_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
8#include "enclave/rpc_map.h"
9#include "error_reporter.h"
10#include "http/http2_types.h"
11#include "http2_parser.h"
12#include "http_rpc_context.h"
13
14namespace http
15{
17
19 {
20 int32_t stream_id;
21
23 size_t client_session_id_,
24 const std::vector<uint8_t>& caller_cert_,
25 const std::optional<ccf::ListenInterfaceID>& interface_id_,
26 http2::StreamId stream_id_) :
27 ccf::SessionContext(client_session_id_, caller_cert_, interface_id_),
28 stream_id(stream_id_)
29 {}
30 };
31
33 {
34 private:
35 http2::StreamId stream_id;
36
37 // Associated HTTP2ServerSession may be closed while responder is held
38 // elsewhere (e.g. async streaming) so keep a weak pointer to parser and
39 // report an error to caller to discard responder.
40 std::weak_ptr<http2::ServerParser> server_parser;
41
42 public:
44
46 http2::StreamId stream_id_,
47 const std::shared_ptr<http2::ServerParser>& server_parser_) :
48 stream_id(stream_id_),
49 server_parser(server_parser_)
50 {}
51
53 ccf::http_status status_code,
54 ccf::http::HeaderMap&& headers,
55 ccf::http::HeaderMap&& trailers,
56 std::vector<uint8_t>&& body) override
57 {
58 auto sp = server_parser.lock();
59 try
60 {
61 sp->respond(
62 stream_id,
63 status_code,
64 std::move(headers),
65 std::move(trailers),
66 std::move(body));
67 }
68 catch (const std::exception& e)
69 {
71 "Error sending response on stream {}: {}", stream_id, e.what());
72 return false;
73 }
74
75 return true;
76 }
77
79 {
80 auto sp = server_parser.lock();
81 if (sp)
82 {
83 try
84 {
85 sp->start_stream(stream_id, status, std::move(headers));
86 }
87 catch (const std::exception& e)
88 {
89 LOG_DEBUG_FMT("Error sending headers {}: {}", stream_id, e.what());
90 return false;
91 }
92 }
93 else
94 {
95 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
96 return false;
97 }
98 return true;
99 }
100
102 {
103 auto sp = server_parser.lock();
104 if (sp)
105 {
106 try
107 {
108 sp->close_stream(stream_id, std::move(trailers));
109 }
110 catch (const std::exception& e)
111 {
112 LOG_DEBUG_FMT("Error closing stream {}: {}", stream_id, e.what());
113 return false;
114 }
115 }
116 else
117 {
118 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
119 return false;
120 }
121 return true;
122 }
123
124 bool stream_data(std::vector<uint8_t>&& data)
125 {
126 auto sp = server_parser.lock();
127 if (sp)
128 {
129 try
130 {
131 sp->send_data(stream_id, std::move(data));
132 }
133 catch (const std::exception& e)
134 {
136 "Error streaming data on stream {}: {}", stream_id, e.what());
137 return false;
138 }
139 }
140 else
141 {
142 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
143 return false;
144 }
145
146 return true;
147 }
148
150 {
151 auto sp = server_parser.lock();
152 if (sp)
153 {
154 try
155 {
156 sp->set_on_stream_close_callback(stream_id, cb);
157 }
158 catch (const std::exception& e)
159 {
161 "Error setting close callback on stream {}: {}",
162 stream_id,
163 e.what());
164 return false;
165 }
166 }
167 else
168 {
169 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
170 return false;
171 }
172 return true;
173 }
174 };
175
179 {
180 private:
181 std::shared_ptr<http2::ServerParser> server_parser;
182
183 std::shared_ptr<ccf::RPCMap> rpc_map;
184 std::shared_ptr<ccf::RpcHandler> handler;
185 std::shared_ptr<ErrorReporter> error_reporter;
186 ccf::ListenInterfaceID interface_id;
187
188 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2StreamResponder>>
189 responders_by_stream;
190
191 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
192 session_ctxs;
193
194 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
195 http2::StreamId stream_id)
196 {
197 auto it = session_ctxs.find(stream_id);
198 if (it == session_ctxs.end())
199 {
200 it = session_ctxs.emplace_hint(
201 it,
202 stream_id,
203 std::make_shared<HTTP2SessionContext>(
204 session_id, tls_io->peer_cert(), interface_id, stream_id));
205 }
206
207 return it->second;
208 }
209
210 std::shared_ptr<HTTP2StreamResponder> get_stream_responder(
211 http2::StreamId stream_id)
212 {
213 auto it = responders_by_stream.find(stream_id);
214 if (it == responders_by_stream.end())
215 {
216 auto responder =
217 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
218 it = responders_by_stream.emplace_hint(it, stream_id, responder);
219 }
220
221 return it->second;
222 }
223
224 void respond_with_error(
225 http2::StreamId stream_id, const ccf::ErrorDetails& error)
226 {
227 nlohmann::json body =
229 const std::string s = body.dump();
230 std::vector<uint8_t> v(s.begin(), s.end());
231
232 ccf::http::HeaderMap headers;
233 headers[ccf::http::headers::CONTENT_TYPE] =
234 ccf::http::headervalues::contenttype::JSON;
235
236 get_stream_responder(stream_id)->send_response(
237 error.status, std::move(headers), {}, std::move(v));
238 }
239
240 public:
242 std::shared_ptr<ccf::RPCMap> rpc_map_,
243 int64_t session_id_,
244 ccf::ListenInterfaceID interface_id_,
245 ringbuffer::AbstractWriterFactory& writer_factory,
246 std::unique_ptr<ccf::tls::Context> ctx,
247 const ccf::http::ParserConfiguration& configuration,
248 const std::shared_ptr<ErrorReporter>& error_reporter_) :
249 HTTP2Session(session_id_, writer_factory, std::move(ctx)),
250 server_parser(
251 std::make_shared<http2::ServerParser>(*this, configuration)),
252 rpc_map(std::move(rpc_map_)),
253 error_reporter(error_reporter_),
254 interface_id(std::move(interface_id_))
255 {
256 server_parser->set_outgoing_data_handler(
257 [this](std::span<const uint8_t> data) {
258 send_data(std::vector<uint8_t>(data.begin(), data.end()));
259 });
260 }
261
262 bool parse(std::span<const uint8_t> data) override
263 {
264 try
265 {
266 if (!server_parser->execute(data.data(), data.size()))
267 {
268 // Close session gracefully
270 return false;
271 }
272 return true;
273 }
275 {
276 if (error_reporter)
277 {
278 error_reporter->report_request_payload_too_large_error(interface_id);
279 }
280
281 LOG_DEBUG_FMT("Request is too large: {}", e.what());
282
284 HTTP_STATUS_PAYLOAD_TOO_LARGE,
285 ccf::errors::RequestBodyTooLarge,
286 e.what()};
287
288 respond_with_error(e.get_stream_id(), error);
289
291 }
293 {
294 if (error_reporter)
295 {
296 error_reporter->report_request_header_too_large_error(interface_id);
297 }
298
299 LOG_DEBUG_FMT("Request header is too large: {}", e.what());
300
302 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
303 ccf::errors::RequestHeaderTooLarge,
304 e.what()};
305
306 respond_with_error(e.get_stream_id(), error);
307
309 }
310 catch (const std::exception& e)
311 {
312 if (error_reporter)
313 {
314 error_reporter->report_parsing_error(interface_id);
315 }
316
317 LOG_DEBUG_FMT("Error parsing HTTP request: {}", e.what());
318
319 // For generic parsing errors, as it is not trivial to construct a valid
320 // HTTP/2 response to send back to the default stream (0), the session
321 // is simply closed.
322
324 }
325 return false;
326 }
327
329 llhttp_method verb,
330 const std::string_view& url,
331 ccf::http::HeaderMap&& headers,
332 std::vector<uint8_t>&& body,
333 int32_t stream_id) override
334 {
336 "Processing msg({}, {} [{} bytes])",
337 llhttp_method_name(verb),
338 url,
339 body.size());
340
341 auto responder = get_stream_responder(stream_id);
342 auto session_ctx = get_session_ctx(stream_id);
343
344 try
345 {
346 std::shared_ptr<http::HttpRpcContext> rpc_ctx = nullptr;
347 try
348 {
349 rpc_ctx = std::make_shared<HttpRpcContext>(
350 session_ctx,
352 verb,
353 url,
354 std::move(headers),
355 std::move(body),
356 responder);
357 }
358 catch (std::exception& e)
359 {
361 HTTP_STATUS_INTERNAL_SERVER_ERROR,
362 ccf::errors::InternalError,
363 fmt::format("Error constructing RpcContext: {}", e.what())});
364 }
365 std::shared_ptr<ccf::RpcHandler> search =
366 http::fetch_rpc_handler(rpc_ctx, rpc_map);
367
368 search->process(rpc_ctx);
369
370 if (rpc_ctx->response_is_pending)
371 {
372 // If the RPC is pending, hold the connection.
373 LOG_TRACE_FMT("Pending");
374 return;
375 }
376
377 responder->send_response(
378 rpc_ctx->get_response_http_status(),
379 rpc_ctx->get_response_headers(),
380 rpc_ctx->get_response_trailers(),
381 std::move(rpc_ctx->take_response_body()));
382 }
383 catch (const std::exception& e)
384 {
385 responder->send_odata_error_response(ccf::ErrorDetails{
386 HTTP_STATUS_INTERNAL_SERVER_ERROR,
387 ccf::errors::InternalError,
388 fmt::format("Exception: {}", e.what())});
389
390 // On any exception, close the connection.
391 LOG_FAIL_FMT("Closing connection");
392 LOG_DEBUG_FMT("Closing connection due to exception: {}", e.what());
394 throw;
395 }
396 }
397
399 ccf::http_status status_code,
400 ccf::http::HeaderMap&& headers,
401 ccf::http::HeaderMap&& trailers,
402 std::vector<uint8_t>&& body) override
403 {
404 return get_stream_responder(http2::DEFAULT_STREAM_ID)
405 ->send_response(
406 status_code,
407 std::move(headers),
408 std::move(trailers),
409 std::move(body));
410 }
411
413 {
414 return get_stream_responder(http2::DEFAULT_STREAM_ID)
415 ->start_stream(status, std::move(headers));
416 }
417
418 bool stream_data(std::vector<uint8_t>&& data)
419 {
420 return get_stream_responder(http2::DEFAULT_STREAM_ID)
421 ->stream_data(std::move(data));
422 }
423
425 {
426 return get_stream_responder(http2::DEFAULT_STREAM_ID)
427 ->close_stream(std::move(trailers));
428 }
429
432 {
433 return get_stream_responder(http2::DEFAULT_STREAM_ID)
434 ->set_on_stream_close_callback(cb);
435 }
436 };
437
439 public ccf::ClientSession,
441 {
442 private:
443 http2::ClientParser client_parser;
444
445 public:
447 int64_t session_id_,
448 ringbuffer::AbstractWriterFactory& writer_factory,
449 std::unique_ptr<ccf::tls::Context> ctx) :
450 HTTP2Session(session_id_, writer_factory, std::move(ctx)),
451 ccf::ClientSession(session_id_, writer_factory),
452 client_parser(*this)
453 {
454 client_parser.set_outgoing_data_handler(
455 [this](std::span<const uint8_t> data) {
456 send_data(std::vector<uint8_t>(data.begin(), data.end()));
457 });
458 }
459
460 bool parse(std::span<const uint8_t> data) override
461 {
462 // Catch response parsing errors and log them
463 try
464 {
465 client_parser.execute(data.data(), data.size());
466
467 return true;
468 }
469 catch (const std::exception& e)
470 {
471 LOG_FAIL_FMT("Error parsing HTTP2 response on session {}", session_id);
472 LOG_DEBUG_FMT("Error parsing HTTP2 response: {}", e.what());
474 "Error occurred while parsing fragment {} byte fragment:\n{}",
475 data.size(),
476 std::string_view(
477 reinterpret_cast<char const*>(data.data()), data.size()));
478
480 }
481 return false;
482 }
483
484 void send_request(http::Request&& request) override
485 {
486 client_parser.send_structured_request(
487 request.get_method(),
488 request.get_path(),
489 request.get_headers(),
490 {request.get_content_data(),
491 request.get_content_data() + request.get_content_length()});
492 }
493
495 ccf::http_status status,
496 ccf::http::HeaderMap&& headers,
497 std::vector<uint8_t>&& body) override
498 {
499 handle_data_cb(status, std::move(headers), std::move(body));
500
501 LOG_TRACE_FMT("Closing connection, message handled");
503 }
504 };
505}
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:24
Definition session.h:121
::tcp::ConnID session_id
Definition session.h:127
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:126
void close_session() override
Definition session.h:109
void send_data(std::vector< uint8_t > &&data) override
Definition session.h:101
Definition http_responder.h:14
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:24
Definition http2_parser.h:499
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::vector< uint8_t > &&body)
Definition http2_parser.h:509
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:441
void send_request(http::Request &&request) override
Definition http2_session.h:484
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:446
void handle_response(ccf::http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:494
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:460
Definition http2_session.h:179
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map_, int64_t session_id_, ccf::ListenInterfaceID interface_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter_)
Definition http2_session.h:241
bool start_stream(ccf::http_status status, ccf::http::HeaderMap &&headers)
Definition http2_session.h:412
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:262
bool close_stream(ccf::http::HeaderMap &&trailers)
Definition http2_session.h:424
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::vector< uint8_t > &&body) override
Definition http2_session.h:398
bool stream_data(std::vector< uint8_t > &&data)
Definition http2_session.h:418
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:328
bool set_on_stream_close_callback(HTTP2StreamResponder::StreamOnCloseCallback cb)
Definition http2_session.h:430
Definition http2_session.h:33
http2::StreamCloseCB StreamOnCloseCallback
Definition http2_session.h:43
bool start_stream(ccf::http_status status, ccf::http::HeaderMap &&headers)
Definition http2_session.h:78
bool stream_data(std::vector< uint8_t > &&data)
Definition http2_session.h:124
bool close_stream(ccf::http::HeaderMap &&trailers)
Definition http2_session.h:101
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::vector< uint8_t > &&body) override
Definition http2_session.h:52
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:45
bool set_on_stream_close_callback(StreamOnCloseCallback cb)
Definition http2_session.h:149
Definition http_exceptions.h:40
Definition http_exceptions.h:31
Definition http_proc.h:20
http2::StreamId get_stream_id() const
Definition http_exceptions.h:24
Definition http_builder.h:117
Definition http_proc.h:33
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
llhttp_status http_status
Definition http_status.h:9
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:20
std::function< void(void)> StreamCloseCB
Definition http2_types.h:23
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
STL namespace.
Definition odata_error.h:58
Definition odata_error.h:50
Definition odata_error.h:37
std::string code
Definition odata_error.h:38
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:19
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:22
int32_t stream_id
Definition http2_session.h:20