CCF
Loading...
Searching...
No Matches
http2_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
8#include "enclave/rpc_map.h"
9#include "error_reporter.h"
11#include "http2_parser.h"
12#include "http_rpc_context.h"
13
14namespace http
15{
17
19 {
20 int32_t stream_id;
21
23 size_t client_session_id_,
24 const std::vector<uint8_t>& caller_cert_,
25 const std::optional<ccf::ListenInterfaceID>& interface_id_,
26 http2::StreamId stream_id_) :
27 ccf::SessionContext(client_session_id_, caller_cert_, interface_id_),
28 stream_id(stream_id_)
29 {}
30 };
31
33 {
34 private:
35 http2::StreamId stream_id;
36
37 // Associated HTTP2ServerSession may be closed while responder is held
38 // elsewhere (e.g. async streaming) so keep a weak pointer to parser and
39 // report an error to caller to discard responder.
40 std::weak_ptr<http2::ServerParser> server_parser;
41
42 public:
44 http2::StreamId stream_id_,
45 const std::shared_ptr<http2::ServerParser>& server_parser_) :
46 stream_id(stream_id_),
47 server_parser(server_parser_)
48 {}
49
51 ccf::http_status status_code,
52 ccf::http::HeaderMap&& headers,
53 ccf::http::HeaderMap&& trailers,
54 std::span<const uint8_t> body) override
55 {
56 auto sp = server_parser.lock();
57 try
58 {
59 sp->respond(
60 stream_id,
61 status_code,
62 std::move(headers),
63 std::move(trailers),
64 body);
65 }
66 catch (const std::exception& e)
67 {
69 "Error sending response on stream {}: {}", stream_id, e.what());
70 return false;
71 }
72
73 return true;
74 }
75
77 ccf::http_status status, const ccf::http::HeaderMap& headers) override
78 {
79 auto sp = server_parser.lock();
80 if (sp)
81 {
82 try
83 {
84 sp->start_stream(stream_id, status, headers);
85 }
86 catch (const std::exception& e)
87 {
88 LOG_DEBUG_FMT("Error sending headers {}: {}", stream_id, e.what());
89 return false;
90 }
91 }
92 else
93 {
94 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
95 return false;
96 }
97 return true;
98 }
99
100 bool close_stream(ccf::http::HeaderMap&& trailers) override
101 {
102 auto sp = server_parser.lock();
103 if (sp)
104 {
105 try
106 {
107 sp->close_stream(stream_id, std::move(trailers));
108 }
109 catch (const std::exception& e)
110 {
111 LOG_DEBUG_FMT("Error closing stream {}: {}", stream_id, e.what());
112 return false;
113 }
114 }
115 else
116 {
117 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
118 return false;
119 }
120 return true;
121 }
122
123 bool stream_data(std::span<const uint8_t> data) override
124 {
125 auto sp = server_parser.lock();
126 if (sp)
127 {
128 try
129 {
130 sp->send_data(stream_id, data);
131 }
132 catch (const std::exception& e)
133 {
135 "Error streaming data on stream {}: {}", stream_id, e.what());
136 return false;
137 }
138 }
139 else
140 {
141 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
142 return false;
143 }
144
145 return true;
146 }
147
150 {
151 auto sp = server_parser.lock();
152 if (sp)
153 {
154 try
155 {
156 sp->set_on_stream_close_callback(stream_id, cb);
157 }
158 catch (const std::exception& e)
159 {
161 "Error setting close callback on stream {}: {}",
162 stream_id,
163 e.what());
164 return false;
165 }
166 }
167 else
168 {
169 LOG_DEBUG_FMT("Stream {} is closed", stream_id);
170 return false;
171 }
172 return true;
173 }
174 };
175
179 {
180 private:
181 std::shared_ptr<http2::ServerParser> server_parser;
182
183 std::shared_ptr<ccf::RPCMap> rpc_map;
184 std::shared_ptr<ccf::RpcHandler> handler;
185 std::shared_ptr<ErrorReporter> error_reporter;
186 ccf::ListenInterfaceID interface_id;
187
188 http::ResponderLookup& responder_lookup;
189
190 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
191 session_ctxs;
192
193 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
194 http2::StreamId stream_id)
195 {
196 auto it = session_ctxs.find(stream_id);
197 if (it == session_ctxs.end())
198 {
199 it = session_ctxs.emplace_hint(
200 it,
201 std::make_pair(
202 stream_id,
203 std::make_shared<HTTP2SessionContext>(
204 session_id, tls_io->peer_cert(), interface_id, stream_id)));
205 }
206
207 return it->second;
208 }
209
210 std::shared_ptr<HTTPResponder> get_stream_responder(
211 http2::StreamId stream_id)
212 {
213 auto responder = responder_lookup.lookup_responder(session_id, stream_id);
214 if (responder == nullptr)
215 {
216 responder =
217 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
218 responder_lookup.add_responder(session_id, stream_id, responder);
219 }
220
221 return responder;
222 }
223
224 void respond_with_error(
225 http2::StreamId stream_id, const ccf::ErrorDetails& error)
226 {
227 nlohmann::json body = ccf::ODataErrorResponse{
228 ccf::ODataError{std::move(error.code), std::move(error.msg)}};
229 const auto s = body.dump();
230
231 ccf::http::HeaderMap headers;
232 headers[ccf::http::headers::CONTENT_TYPE] =
233 ccf::http::headervalues::contenttype::JSON;
234
235 get_stream_responder(stream_id)->send_response(
236 error.status,
237 std::move(headers),
238 {},
239 {(const uint8_t*)s.data(), s.size()});
240 }
241
242 public:
244 std::shared_ptr<ccf::RPCMap> rpc_map,
245 int64_t session_id_,
246 const ccf::ListenInterfaceID& interface_id,
247 ringbuffer::AbstractWriterFactory& writer_factory,
248 std::unique_ptr<ccf::tls::Context> ctx,
249 const ccf::http::ParserConfiguration& configuration,
250 const std::shared_ptr<ErrorReporter>& error_reporter,
251 http::ResponderLookup& responder_lookup_) :
252 HTTP2Session(session_id_, writer_factory, std::move(ctx)),
253 server_parser(
254 std::make_shared<http2::ServerParser>(*this, configuration)),
255 rpc_map(rpc_map),
256 error_reporter(error_reporter),
257 interface_id(interface_id),
258 responder_lookup(responder_lookup_)
259 {
260 server_parser->set_outgoing_data_handler(
261 [this](std::span<const uint8_t> data) {
262 this->tls_io->send_raw(data.data(), data.size());
263 });
264 }
265
267 {
268 responder_lookup.cleanup_responders(session_id);
269 }
270
271 bool parse(std::span<const uint8_t> data) override
272 {
273 try
274 {
275 if (!server_parser->execute(data.data(), data.size()))
276 {
277 // Close session gracefully
278 tls_io->close();
279 return false;
280 }
281 return true;
282 }
284 {
285 if (error_reporter)
286 {
287 error_reporter->report_request_payload_too_large_error(interface_id);
288 }
289
290 LOG_DEBUG_FMT("Request is too large: {}", e.what());
291
293 HTTP_STATUS_PAYLOAD_TOO_LARGE,
294 ccf::errors::RequestBodyTooLarge,
295 e.what()};
296
297 respond_with_error(e.get_stream_id(), error);
298
299 tls_io->close();
300 }
302 {
303 if (error_reporter)
304 {
305 error_reporter->report_request_header_too_large_error(interface_id);
306 }
307
308 LOG_DEBUG_FMT("Request header is too large: {}", e.what());
309
311 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
312 ccf::errors::RequestHeaderTooLarge,
313 e.what()};
314
315 respond_with_error(e.get_stream_id(), error);
316
317 tls_io->close();
318 }
319 catch (const std::exception& e)
320 {
321 if (error_reporter)
322 {
323 error_reporter->report_parsing_error(interface_id);
324 }
325
326 LOG_DEBUG_FMT("Error parsing HTTP request: {}", e.what());
327
328 // For generic parsing errors, as it is not trivial to construct a valid
329 // HTTP/2 response to send back to the default stream (0), the session
330 // is simply closed.
331
332 tls_io->close();
333 }
334 return false;
335 }
336
338 llhttp_method verb,
339 const std::string_view& url,
340 ccf::http::HeaderMap&& headers,
341 std::vector<uint8_t>&& body,
342 int32_t stream_id) override
343 {
345 "Processing msg({}, {} [{} bytes])",
346 llhttp_method_name(verb),
347 url,
348 body.size());
349
350 auto responder = get_stream_responder(stream_id);
351 auto session_ctx = get_session_ctx(stream_id);
352
353 try
354 {
355 std::shared_ptr<http::HttpRpcContext> rpc_ctx = nullptr;
356 try
357 {
358 rpc_ctx = std::make_shared<HttpRpcContext>(
359 session_ctx,
361 verb,
362 url,
363 std::move(headers),
364 std::move(body),
365 responder);
366 }
367 catch (std::exception& e)
368 {
370 HTTP_STATUS_INTERNAL_SERVER_ERROR,
371 ccf::errors::InternalError,
372 fmt::format("Error constructing RpcContext: {}", e.what())});
373 }
374 std::shared_ptr<ccf::RpcHandler> search =
375 http::fetch_rpc_handler(rpc_ctx, rpc_map);
376
377 search->process(rpc_ctx);
378
379 if (rpc_ctx->response_is_pending)
380 {
381 // If the RPC is pending, hold the connection.
382 LOG_TRACE_FMT("Pending");
383 return;
384 }
385 else
386 {
387 responder->send_response(
388 rpc_ctx->get_response_http_status(),
389 rpc_ctx->get_response_headers(),
390 rpc_ctx->get_response_trailers(),
391 std::move(rpc_ctx->get_response_body()));
392 }
393 }
394 catch (const std::exception& e)
395 {
396 responder->send_odata_error_response(ccf::ErrorDetails{
397 HTTP_STATUS_INTERNAL_SERVER_ERROR,
398 ccf::errors::InternalError,
399 fmt::format("Exception: {}", e.what())});
400
401 // On any exception, close the connection.
402 LOG_FAIL_FMT("Closing connection");
403 LOG_DEBUG_FMT("Closing connection due to exception: {}", e.what());
404 tls_io->close();
405 throw;
406 }
407 }
408
410 ccf::http_status status_code,
411 ccf::http::HeaderMap&& headers,
412 ccf::http::HeaderMap&& trailers,
413 std::span<const uint8_t> body) override
414 {
415 return get_stream_responder(http2::DEFAULT_STREAM_ID)
416 ->send_response(
417 status_code, std::move(headers), std::move(trailers), body);
418 }
419
421 ccf::http_status status, const ccf::http::HeaderMap& headers) override
422 {
423 return get_stream_responder(http2::DEFAULT_STREAM_ID)
424 ->start_stream(status, headers);
425 }
426
427 bool stream_data(std::span<const uint8_t> data) override
428 {
429 return get_stream_responder(http2::DEFAULT_STREAM_ID)->stream_data(data);
430 }
431
432 bool close_stream(ccf::http::HeaderMap&& trailers) override
433 {
434 return get_stream_responder(http2::DEFAULT_STREAM_ID)
435 ->close_stream(std::move(trailers));
436 }
437
440 {
441 return get_stream_responder(http2::DEFAULT_STREAM_ID)
442 ->set_on_stream_close_callback(cb);
443 }
444 };
445
447 public ccf::ClientSession,
449 {
450 private:
451 http2::ClientParser client_parser;
452
453 public:
455 int64_t session_id_,
456 ringbuffer::AbstractWriterFactory& writer_factory,
457 std::unique_ptr<ccf::tls::Context> ctx) :
458 HTTP2Session(session_id_, writer_factory, std::move(ctx)),
459 ccf::ClientSession(session_id_, writer_factory),
460 client_parser(*this)
461 {
462 client_parser.set_outgoing_data_handler(
463 [this](std::span<const uint8_t> data) {
464 this->tls_io->send_raw(data.data(), data.size());
465 });
466 }
467
468 bool parse(std::span<const uint8_t> data) override
469 {
470 // Catch response parsing errors and log them
471 try
472 {
473 client_parser.execute(data.data(), data.size());
474
475 return true;
476 }
477 catch (const std::exception& e)
478 {
479 LOG_FAIL_FMT("Error parsing HTTP2 response on session {}", session_id);
480 LOG_DEBUG_FMT("Error parsing HTTP2 response: {}", e.what());
482 "Error occurred while parsing fragment {} byte fragment:\n{}",
483 data.size(),
484 std::string_view((char const*)data.data(), data.size()));
485
486 tls_io->close();
487 }
488 return false;
489 }
490
491 void send_request(http::Request&& request) override
492 {
493 client_parser.send_structured_request(
494 request.get_method(),
495 request.get_path(),
496 request.get_headers(),
497 {request.get_content_data(), request.get_content_length()});
498 }
499
501 ccf::http_status status,
502 ccf::http::HeaderMap&& headers,
503 std::vector<uint8_t>&& body) override
504 {
505 handle_data_cb(status, std::move(headers), std::move(body));
506
507 LOG_TRACE_FMT("Closing connection, message handled");
508 tls_io->close();
509 }
510 };
511}
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:22
Definition session.h:80
::tcp::ConnID session_id
Definition session.h:86
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:85
Definition http_responder.h:16
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:35
Definition http2_parser.h:501
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::span< const uint8_t > body)
Definition http2_parser.h:511
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:449
void send_request(http::Request &&request) override
Definition http2_session.h:491
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:454
void handle_response(ccf::http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:500
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:468
Definition http2_session.h:179
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:438
bool start_stream(ccf::http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:420
~HTTP2ServerSession()
Definition http2_session.h:266
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:271
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map, int64_t session_id_, const ccf::ListenInterfaceID &interface_id, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter, http::ResponderLookup &responder_lookup_)
Definition http2_session.h:243
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:409
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:337
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:427
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:432
Definition http2_session.h:33
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:148
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:100
bool start_stream(ccf::http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:76
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:50
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:43
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:123
Definition http_exceptions.h:40
Definition http_exceptions.h:31
Definition http_proc.h:20
http2::StreamId get_stream_id() const
Definition http_exceptions.h:24
Definition http_builder.h:118
Definition responder_lookup.h:14
void cleanup_responders(::tcp::ConnID session_id)
Definition responder_lookup.h:52
std::shared_ptr< ccf::http::HTTPResponder > lookup_responder(::tcp::ConnID session_id, http2::StreamId stream_id)
Definition responder_lookup.h:25
void add_responder(::tcp::ConnID session_id, http2::StreamId stream_id, std::shared_ptr< ccf::http::HTTPResponder > responder)
Definition responder_lookup.h:43
Definition http_proc.h:31
Definition ring_buffer_types.h:153
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
llhttp_status http_status
Definition http_status.h:9
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:21
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
STL namespace.
Definition odata_error.h:58
Definition odata_error.h:50
Definition odata_error.h:37
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:19
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:22
int32_t stream_id
Definition http2_session.h:20