23 size_t client_session_id_,
24 const std::vector<uint8_t>& caller_cert_,
25 const std::optional<ccf::ListenInterfaceID>& interface_id_,
40 std::weak_ptr<http2::ServerParser> server_parser;
47 const std::shared_ptr<http2::ServerParser>& server_parser_) :
48 stream_id(stream_id_),
49 server_parser(server_parser_)
56 std::vector<uint8_t>&& body)
override
58 auto sp = server_parser.lock();
68 catch (
const std::exception& e)
71 "Error sending response on stream {}: {}", stream_id, e.what());
80 auto sp = server_parser.lock();
85 sp->start_stream(stream_id, status, std::move(headers));
87 catch (
const std::exception& e)
89 LOG_DEBUG_FMT(
"Error sending headers {}: {}", stream_id, e.what());
103 auto sp = server_parser.lock();
108 sp->close_stream(stream_id, std::move(trailers));
110 catch (
const std::exception& e)
112 LOG_DEBUG_FMT(
"Error closing stream {}: {}", stream_id, e.what());
126 auto sp = server_parser.lock();
131 sp->send_data(stream_id, std::move(data));
133 catch (
const std::exception& e)
136 "Error streaming data on stream {}: {}", stream_id, e.what());
151 auto sp = server_parser.lock();
156 sp->set_on_stream_close_callback(stream_id, cb);
158 catch (
const std::exception& e)
161 "Error setting close callback on stream {}: {}",
181 std::shared_ptr<http2::ServerParser> server_parser;
183 std::shared_ptr<ccf::RPCMap> rpc_map;
184 std::shared_ptr<ccf::RpcHandler> handler;
185 std::shared_ptr<ErrorReporter> error_reporter;
188 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2StreamResponder>>
189 responders_by_stream;
191 std::unordered_map<http2::StreamId, std::shared_ptr<HTTP2SessionContext>>
194 std::shared_ptr<HTTP2SessionContext> get_session_ctx(
197 auto it = session_ctxs.find(stream_id);
198 if (it == session_ctxs.end())
200 it = session_ctxs.emplace_hint(
203 std::make_shared<HTTP2SessionContext>(
210 std::shared_ptr<HTTP2StreamResponder> get_stream_responder(
213 auto it = responders_by_stream.find(stream_id);
214 if (it == responders_by_stream.end())
217 std::make_shared<HTTP2StreamResponder>(stream_id, server_parser);
218 it = responders_by_stream.emplace_hint(it, stream_id, responder);
224 void respond_with_error(
227 nlohmann::json body =
229 const std::string s = body.dump();
230 std::vector<uint8_t> v(s.begin(), s.end());
233 headers[ccf::http::headers::CONTENT_TYPE] =
234 ccf::http::headervalues::contenttype::JSON;
236 get_stream_responder(stream_id)->send_response(
237 error.status, std::move(headers), {}, std::move(v));
242 std::shared_ptr<ccf::RPCMap> rpc_map_,
246 std::unique_ptr<ccf::tls::Context> ctx,
248 const std::shared_ptr<ErrorReporter>& error_reporter_) :
251 std::make_shared<
http2::ServerParser>(*this, configuration)),
252 rpc_map(
std::move(rpc_map_)),
253 error_reporter(error_reporter_),
254 interface_id(
std::move(interface_id_))
256 server_parser->set_outgoing_data_handler(
257 [
this](std::span<const uint8_t> data) {
258 send_data(std::vector<uint8_t>(data.begin(), data.end()));
262 bool parse(std::span<const uint8_t> data)
override
266 if (!server_parser->execute(data.data(), data.size()))
278 error_reporter->report_request_payload_too_large_error(interface_id);
284 HTTP_STATUS_PAYLOAD_TOO_LARGE,
285 ccf::errors::RequestBodyTooLarge,
296 error_reporter->report_request_header_too_large_error(interface_id);
302 HTTP_STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE,
303 ccf::errors::RequestHeaderTooLarge,
310 catch (
const std::exception& e)
314 error_reporter->report_parsing_error(interface_id);
330 const std::string_view& url,
332 std::vector<uint8_t>&& body,
333 int32_t stream_id)
override
336 "Processing msg({}, {} [{} bytes])",
337 llhttp_method_name(verb),
341 auto responder = get_stream_responder(stream_id);
342 auto session_ctx = get_session_ctx(stream_id);
346 std::shared_ptr<http::HttpRpcContext> rpc_ctx =
nullptr;
349 rpc_ctx = std::make_shared<HttpRpcContext>(
358 catch (std::exception& e)
361 HTTP_STATUS_INTERNAL_SERVER_ERROR,
362 ccf::errors::InternalError,
363 fmt::format(
"Error constructing RpcContext: {}", e.what())});
365 std::shared_ptr<ccf::RpcHandler> search =
366 http::fetch_rpc_handler(rpc_ctx, rpc_map);
368 search->process(rpc_ctx);
370 if (rpc_ctx->response_is_pending)
377 responder->send_response(
378 rpc_ctx->get_response_http_status(),
379 rpc_ctx->get_response_headers(),
380 rpc_ctx->get_response_trailers(),
381 std::move(rpc_ctx->take_response_body()));
383 catch (
const std::exception& e)
386 HTTP_STATUS_INTERNAL_SERVER_ERROR,
387 ccf::errors::InternalError,
388 fmt::format(
"Exception: {}", e.what())});
392 LOG_DEBUG_FMT(
"Closing connection due to exception: {}", e.what());
402 std::vector<uint8_t>&& body)
override
404 return get_stream_responder(http2::DEFAULT_STREAM_ID)
414 return get_stream_responder(http2::DEFAULT_STREAM_ID)
415 ->start_stream(status, std::move(headers));
420 return get_stream_responder(http2::DEFAULT_STREAM_ID)
421 ->stream_data(std::move(data));
426 return get_stream_responder(http2::DEFAULT_STREAM_ID)
427 ->close_stream(std::move(trailers));
433 return get_stream_responder(http2::DEFAULT_STREAM_ID)
434 ->set_on_stream_close_callback(cb);
449 std::unique_ptr<ccf::tls::Context> ctx) :
455 [
this](std::span<const uint8_t> data) {
456 send_data(std::vector<uint8_t>(data.begin(), data.end()));
460 bool parse(std::span<const uint8_t> data)
override
465 client_parser.
execute(data.data(), data.size());
469 catch (
const std::exception& e)
474 "Error occurred while parsing fragment {} byte fragment:\n{}",
477 reinterpret_cast<char const*
>(data.data()), data.size()));
487 request.get_method(),
489 request.get_headers(),
490 {request.get_content_data(),
491 request.get_content_data() + request.get_content_length()});
497 std::vector<uint8_t>&& body)
override
Definition client_session.h:11
HandleDataCallback handle_data_cb
Definition client_session.h:24
::tcp::ConnID session_id
Definition session.h:127
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:126
void close_session() override
Definition session.h:109
void send_data(std::vector< uint8_t > &&data) override
Definition session.h:101
Definition http_responder.h:14
bool send_odata_error_response(ccf::ErrorDetails &&error)
Definition http_responder.h:24
Definition http2_parser.h:499
void send_structured_request(llhttp_method method, const std::string &route, const ccf::http::HeaderMap &headers, std::vector< uint8_t > &&body)
Definition http2_parser.h:509
void set_outgoing_data_handler(DataHandlerCB &&cb)
Definition http2_parser.h:125
bool execute(const uint8_t *data, size_t size)
Definition http2_parser.h:195
Definition http2_session.h:441
void send_request(http::Request &&request) override
Definition http2_session.h:484
HTTP2ClientSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition http2_session.h:446
void handle_response(ccf::http_status status, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body) override
Definition http2_session.h:494
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:460
Definition http2_session.h:179
HTTP2ServerSession(std::shared_ptr< ccf::RPCMap > rpc_map_, int64_t session_id_, ccf::ListenInterfaceID interface_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx, const ccf::http::ParserConfiguration &configuration, const std::shared_ptr< ErrorReporter > &error_reporter_)
Definition http2_session.h:241
bool start_stream(ccf::http_status status, ccf::http::HeaderMap &&headers)
Definition http2_session.h:412
bool parse(std::span< const uint8_t > data) override
Definition http2_session.h:262
bool close_stream(ccf::http::HeaderMap &&trailers)
Definition http2_session.h:424
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::vector< uint8_t > &&body) override
Definition http2_session.h:398
bool stream_data(std::vector< uint8_t > &&data)
Definition http2_session.h:418
void handle_request(llhttp_method verb, const std::string_view &url, ccf::http::HeaderMap &&headers, std::vector< uint8_t > &&body, int32_t stream_id) override
Definition http2_session.h:328
bool set_on_stream_close_callback(HTTP2StreamResponder::StreamOnCloseCallback cb)
Definition http2_session.h:430
Definition http2_session.h:33
http2::StreamCloseCB StreamOnCloseCallback
Definition http2_session.h:43
bool start_stream(ccf::http_status status, ccf::http::HeaderMap &&headers)
Definition http2_session.h:78
bool stream_data(std::vector< uint8_t > &&data)
Definition http2_session.h:124
bool close_stream(ccf::http::HeaderMap &&trailers)
Definition http2_session.h:101
bool send_response(ccf::http_status status_code, ccf::http::HeaderMap &&headers, ccf::http::HeaderMap &&trailers, std::vector< uint8_t > &&body) override
Definition http2_session.h:52
HTTP2StreamResponder(http2::StreamId stream_id_, const std::shared_ptr< http2::ServerParser > &server_parser_)
Definition http2_session.h:45
bool set_on_stream_close_callback(StreamOnCloseCallback cb)
Definition http2_session.h:149
Definition http_proc.h:20
Definition http_builder.h:117
Definition http_proc.h:33
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::map< std::string, std::string, std::less<> > HeaderMap
Definition http_header_map.h:10
Definition app_interface.h:14
std::string ListenInterfaceID
Definition rpc_context.h:21
llhttp_status http_status
Definition http_status.h:9
Definition http2_callbacks.h:12
int32_t StreamId
Definition http2_types.h:20
std::function< void(void)> StreamCloseCB
Definition http2_types.h:23
Definition error_reporter.h:6
std::vector< uint8_t > error(ccf::ErrorDetails &&error)
Definition http_rpc_context.h:14
Definition odata_error.h:58
Definition odata_error.h:50
Definition odata_error.h:37
std::string code
Definition odata_error.h:38
Definition rpc_context.h:24
Definition http_configuration.h:24
Definition http2_session.h:19
HTTP2SessionContext(size_t client_session_id_, const std::vector< uint8_t > &caller_cert_, const std::optional< ccf::ListenInterfaceID > &interface_id_, http2::StreamId stream_id_)
Definition http2_session.h:22
int32_t stream_id
Definition http2_session.h:20