23 size_t client_session_id_,
24 const std::vector<uint8_t>& caller_cert_,
25 const std::optional<ccf::ListenInterfaceID>& interface_id_,
40 std::weak_ptr<http2::ServerParser> server_parser;
45 const std::shared_ptr<http2::ServerParser>& server_parser_) :
46 stream_id(stream_id_),
47 server_parser(server_parser_)
54 std::span<const uint8_t> body)
override
56 auto sp = server_parser.lock();
66 catch (
const std::exception& e)
69 "Error sending response on stream {}: {}", stream_id, e.what());
79 auto sp = server_parser.lock();
84 sp->start_stream(stream_id, status, headers);
86 catch (
const std::exception& e)
88 LOG_DEBUG_FMT(
"Error sending headers {}: {}", stream_id, e.what());
102 auto sp = server_parser.lock();
107 sp->close_stream(stream_id, std::move(trailers));
109 catch (
const std::exception& e)
111 LOG_DEBUG_FMT(
"Error closing stream {}: {}", stream_id, e.what());
125 auto sp = server_parser.lock();
130 sp->send_data(stream_id, data);
132 catch (
const std::exception& e)
135 "Error streaming data on stream {}: {}", stream_id, e.what());
151 auto sp = server_parser.lock();
156 sp->set_on_stream_close_callback(stream_id, cb);
158 catch (
const std::exception& e)
161 "Error setting close callback on stream {}: {}",
181 std::shared_ptr<http2::ServerParser> server_parser;
183 std::shared_ptr<ccf::RPCMap> rpc_map;
184 std::shared_ptr<ccf::RpcHandler> handler;
185 std::shared_ptr<ErrorReporter> error_reporter;
190 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
193 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
196 auto it = session_ctxs.find(stream_id);
197 if (it == session_ctxs.end())
199 it = session_ctxs.emplace_hint(
203 std::make_shared<HTTP2SessionContext>(
210 std::shared_ptr<HTTPResponder> get_stream_responder(
214 if (responder ==
nullptr)
217 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
224 void respond_with_error(
229 const auto s = body.dump();
232 headers[ccf::http::headers::CONTENT_TYPE] =
233 ccf::http::headervalues::contenttype::JSON;
235 get_stream_responder(stream_id)->send_response(
239 {(const uint8_t*)s.data(), s.size()});
244 std::shared_ptr<ccf::RPCMap> rpc_map,
248 std::unique_ptr<ccf::tls::Context> ctx,
250 const std::shared_ptr<ErrorReporter>& error_reporter,
254 std::make_shared<
http2::ServerParser>(*this, configuration)),
256 error_reporter(error_reporter),
257 interface_id(interface_id),
258 responder_lookup(responder_lookup_)
260 server_parser->set_outgoing_data_handler(
261 [
this](std::span<const uint8_t> data) {
262 this->
tls_io->send_raw(data.data(), data.size());
271 bool parse(std::span<const uint8_t> data)
override
275 if (!server_parser->execute(data.data(), data.size()))
287 error_reporter->report_request_payload_too_large_error(interface_id);
293 HTTP_STATUS_PAYLOAD_TOO_LARGE,
294 ccf::errors::RequestBodyTooLarge,
305 error_reporter->report_request_header_too_large_error(interface_id);
311 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
312 ccf::errors::RequestHeaderTooLarge,
319 catch (
const std::exception& e)
323 error_reporter->report_parsing_error(interface_id);
339 const std::string_view& url,
341 std::vector<uint8_t>&& body,
342 int32_t stream_id)
override
345 "Processing msg({}, {} [{} bytes])",
346 llhttp_method_name(verb),
350 auto responder = get_stream_responder(stream_id);
351 auto session_ctx = get_session_ctx(stream_id);
355 std::shared_ptr<http::HttpRpcContext> rpc_ctx =
nullptr;
358 rpc_ctx = std::make_shared<HttpRpcContext>(
367 catch (std::exception& e)
370 HTTP_STATUS_INTERNAL_SERVER_ERROR,
371 ccf::errors::InternalError,
372 fmt::format(
"Error constructing RpcContext: {}", e.what())});
374 std::shared_ptr<ccf::RpcHandler> search =
375 http::fetch_rpc_handler(rpc_ctx, rpc_map);
377 search->process(rpc_ctx);
379 if (rpc_ctx->response_is_pending)
387 responder->send_response(
388 rpc_ctx->get_response_http_status(),
389 rpc_ctx->get_response_headers(),
390 rpc_ctx->get_response_trailers(),
391 std::move(rpc_ctx->get_response_body()));
394 catch (
const std::exception& e)
397 HTTP_STATUS_INTERNAL_SERVER_ERROR,
398 ccf::errors::InternalError,
399 fmt::format(
"Exception: {}", e.what())});
403 LOG_DEBUG_FMT(
"Closing connection due to exception: {}", e.what());
413 std::span<const uint8_t> body)
override
415 return get_stream_responder(http2::DEFAULT_STREAM_ID)
417 status_code, std::move(headers), std::move(trailers), body);
423 return get_stream_responder(http2::DEFAULT_STREAM_ID)
424 ->start_stream(status, headers);
429 return get_stream_responder(http2::DEFAULT_STREAM_ID)->stream_data(data);
434 return get_stream_responder(http2::DEFAULT_STREAM_ID)
435 ->close_stream(std::move(trailers));
441 return get_stream_responder(http2::DEFAULT_STREAM_ID)
442 ->set_on_stream_close_callback(cb);
457 std::unique_ptr<ccf::tls::Context> ctx) :
463 [
this](std::span<const uint8_t> data) {
464 this->
tls_io->send_raw(data.data(), data.size());
468 bool parse(std::span<const uint8_t> data)
override
473 client_parser.
execute(data.data(), data.size());
477 catch (
const std::exception& e)
482 "Error occurred while parsing fragment {} byte fragment:\n{}",
484 std::string_view((
char const*)data.data(), data.size()));
494 request.get_method(),
496 request.get_headers(),
497 {request.get_content_data(), request.get_content_length()});
503 std::vector<uint8_t>&& body)
override
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:22
::tcp::ConnID session_id
Definition session.h:86
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:85
Definition http_responder.h:16
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:35
Definition http2_parser.h:501
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::span< const uint8_t > body)
Definition http2_parser.h:511
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:449
void send_request(http::Request &&request) override
Definition http2_session.h:491
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:454
void handle_response(ccf::http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:500
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:468
Definition http2_session.h:179
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:438
bool start_stream(ccf::http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:420
~HTTP2ServerSession()
Definition http2_session.h:266
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:271
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map, int64_t session_id_, const ccf::ListenInterfaceID &interface_id, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter, http::ResponderLookup &responder_lookup_)
Definition http2_session.h:243
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:409
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:337
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:427
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:432
Definition http2_session.h:33
bool set_on_stream_close_callback(ccf::http::StreamOnCloseCallback cb) override
Definition http2_session.h:148
bool close_stream(ccf::http::HeaderMap &&trailers) override
Definition http2_session.h:100
bool start_stream(ccf::http_status status, const ccf::http::HeaderMap &headers) override
Definition http2_session.h:76
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::span< const uint8_t > body) override
Definition http2_session.h:50
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:43
bool stream_data(std::span< const uint8_t > data) override
Definition http2_session.h:123
Definition http_proc.h:20
Definition http_builder.h:118
Definition responder_lookup.h:14
void cleanup_responders(::tcp::ConnID session_id)
Definition responder_lookup.h:52
std::shared_ptr< ccf::http::HTTPResponder > lookup_responder(::tcp::ConnID session_id, http2::StreamId stream_id)
Definition responder_lookup.h:25
void add_responder(::tcp::ConnID session_id, http2::StreamId stream_id, std::shared_ptr< ccf::http::HTTPResponder > responder)
Definition responder_lookup.h:43
Definition http_proc.h:31
Definition ring_buffer_types.h:153
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
std::function< void(void)> StreamOnCloseCallback
Definition http_responder.h:13
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
llhttp_status http_status
Definition http_status.h:9
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:21
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
Definition odata_error.h:58
Definition odata_error.h:50
Definition odata_error.h:37
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:19
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:22
int32_t stream_id
Definition http2_session.h:20