31#include <unordered_map>
37 static constexpr size_t max_open_sessions_soft_default = 1000;
38 static constexpr size_t max_open_sessions_hard_default = 1010;
41 class RPCSessions :
public std::enable_shared_from_this<RPCSessions>,
46 struct ListenInterface
48 size_t open_sessions = 0;
49 size_t peak_sessions = 0;
50 size_t max_open_sessions_soft = 0;
51 size_t max_open_sessions_hard = 0;
57 std::map<ListenInterfaceID, ListenInterface> listening_interfaces;
61 std::shared_ptr<RPCMap> rpc_map;
62 std::unordered_map<ListenInterfaceID, std::shared_ptr<::tls::Cert>> certs;
63 std::shared_ptr<CustomProtocolSubsystem> custom_protocol_subsystem;
68 std::pair<ListenInterfaceID, std::shared_ptr<ccf::Session>>>
70 size_t sessions_peak = 0;
74 std::atomic<ccf::tls::ConnID> next_client_session_id = -1;
76 template <
typename Base>
77 class NoMoreSessionsImpl :
public Base
80 template <
typename... Ts>
81 NoMoreSessionsImpl(Ts&&... ts) : Base(std::forward<Ts>(ts)...)
84 void handle_incoming_data_thread(std::vector<uint8_t>&& data)
override
86 Base::tls_io->recv_buffered(data.data(), data.size());
92 HTTP_STATUS_SERVICE_UNAVAILABLE,
93 ccf::errors::SessionCapExhausted,
94 "Service is currently busy and unable to serve new connections"});
97 Base::tls_io->close();
104 auto id = next_client_session_id--;
105 const auto initial = id;
107 if (next_client_session_id > 0)
109 next_client_session_id = -1;
112 while (sessions.find(
id) != sessions.end())
123 throw std::runtime_error(
124 "Exhausted all IDs for enclave client sessions");
131 ListenInterface& get_interface_from_interface_id(
134 auto it = listening_interfaces.find(
id);
135 if (it != listening_interfaces.end())
140 throw std::logic_error(
141 fmt::format(
"No RPC interface for interface ID {}",
id));
144 std::shared_ptr<ccf::Session> make_server_session(
145 const std::string& app_protocol,
148 std::unique_ptr<tls::Context>&& ctx,
151 if (app_protocol ==
"HTTP2")
153 return std::make_shared<::http::HTTP2ServerSession>(
159 parser_configuration,
162 if (app_protocol ==
"HTTP1")
164 return std::make_shared<::http::HTTPServerSession>(
170 parser_configuration,
173 if (custom_protocol_subsystem)
175 return custom_protocol_subsystem->create_session(
176 app_protocol,
id, std::move(ctx));
179 throw std::runtime_error(fmt::format(
180 "unknown protocol '{}' and custom protocol subsystem missing",
187 std::shared_ptr<RPCMap> rpc_map_) :
188 writer_factory(writer_factory),
189 rpc_map(
std::move(rpc_map_)),
190 custom_protocol_subsystem(nullptr)
196 std::shared_ptr<CustomProtocolSubsystem> cpss)
198 custom_protocol_subsystem = cpss;
203 std::lock_guard<ccf::pal::Mutex> guard(lock);
204 get_interface_from_interface_id(
id).errors.parsing++;
210 std::lock_guard<ccf::pal::Mutex> guard(lock);
211 get_interface_from_interface_id(
id).errors.request_payload_too_large++;
217 std::lock_guard<ccf::pal::Mutex> guard(lock);
218 get_interface_from_interface_id(
id).errors.request_header_too_large++;
224 std::lock_guard<ccf::pal::Mutex> guard(lock);
228 auto& li = listening_interfaces[name];
230 li.max_open_sessions_soft = interface.max_open_sessions_soft.value_or(
231 max_open_sessions_soft_default);
233 li.max_open_sessions_hard = interface.max_open_sessions_hard.value_or(
234 max_open_sessions_hard_default);
236 li.endorsement = interface.endorsement.value_or(endorsement_default);
238 li.http_configuration =
241 li.app_protocol = interface.app_protocol.value_or(
"HTTP1");
244 "Setting max open sessions on interface \"{}\" ({}) to [{}, "
245 "{}] and endorsement authority to {}",
247 interface.bind_address,
248 li.max_open_sessions_soft,
249 li.max_open_sessions_hard,
250 li.endorsement.authority);
257 std::lock_guard<ccf::pal::Mutex> guard(lock);
259 sm.
active = sessions.size();
260 sm.
peak = sessions_peak;
262 for (
const auto& [name, interface] : listening_interfaces)
265 interface.open_sessions,
266 interface.peak_sessions,
267 interface.max_open_sessions_soft,
268 interface.max_open_sessions_hard,
281 if (listening_interfaces.empty())
283 throw std::logic_error(
"No listening interface for this node");
286 return listening_interfaces.begin()->second.app_protocol;
310 auto cert = std::make_shared<::tls::Cert>(
311 nullptr, cert_, pk, std::nullopt,
false);
313 std::lock_guard<ccf::pal::Mutex> guard(lock);
315 for (
auto& [listen_interface_id, interface] : listening_interfaces)
317 if (interface.endorsement.authority == authority)
319 certs.insert_or_assign(listen_interface_id, cert);
329 std::lock_guard<ccf::pal::Mutex> guard(lock);
331 if (sessions.find(
id) != sessions.end())
333 throw std::logic_error(
334 fmt::format(
"Duplicate conn ID received inside enclave: {}",
id));
337 auto it = listening_interfaces.find(listen_interface_id);
338 if (it == listening_interfaces.end())
340 throw std::logic_error(fmt::format(
341 "Can't accept new RPC session {} - comes from unknown listening "
344 listen_interface_id));
347 auto& per_listen_interface = it->second;
351 certs.find(listen_interface_id) == certs.end())
354 "Refusing TLS session {} inside the enclave - interface {} "
355 "has no TLS certificate yet",
357 listen_interface_id);
360 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
363 per_listen_interface.open_sessions >=
364 per_listen_interface.max_open_sessions_hard)
367 "Refusing TLS session {} inside the enclave - already have {} "
368 "sessions from interface {} and limit is {}",
370 per_listen_interface.open_sessions,
372 per_listen_interface.max_open_sessions_hard);
375 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
378 per_listen_interface.open_sessions >=
379 per_listen_interface.max_open_sessions_soft)
382 "Soft refusing session {} (returning 503) inside the enclave - "
383 "already have {} sessions from interface {} and limit is {}",
385 per_listen_interface.open_sessions,
387 per_listen_interface.max_open_sessions_soft);
389 auto ctx = std::make_unique<::tls::Server>(certs[listen_interface_id]);
390 std::shared_ptr<Session> capped_session;
391 if (per_listen_interface.app_protocol ==
"HTTP2")
394 std::make_shared<NoMoreSessionsImpl<::http::HTTP2ServerSession>>(
400 per_listen_interface.http_configuration,
406 std::make_shared<NoMoreSessionsImpl<::http::HTTPServerSession>>(
412 per_listen_interface.http_configuration,
415 sessions.insert(std::make_pair(
416 id, std::make_pair(listen_interface_id, std::move(capped_session))));
417 per_listen_interface.open_sessions++;
418 per_listen_interface.peak_sessions = std::max(
419 per_listen_interface.peak_sessions,
420 per_listen_interface.open_sessions);
425 "Accepting a session {} inside the enclave from interface \"{}\"",
427 listen_interface_id);
432 if (per_listen_interface.app_protocol ==
"QUIC")
434 auto session = std::make_shared<QUICSessionImpl>(
435 rpc_map,
id, listen_interface_id, writer_factory);
436 sessions.insert(std::make_pair(
437 id, std::make_pair(listen_interface_id, std::move(session))));
439 else if (custom_protocol_subsystem)
445 std::make_pair(
id, std::make_pair(listen_interface_id,
nullptr)));
449 throw std::runtime_error(
450 "unknown UDP protocol and custom protocol subsystem missing");
452 per_listen_interface.open_sessions++;
453 per_listen_interface.peak_sessions = std::max(
454 per_listen_interface.peak_sessions,
455 per_listen_interface.open_sessions);
459 std::unique_ptr<tls::Context> ctx;
463 ctx = std::make_unique<nontls::PlaintextServer>();
467 ctx = std::make_unique<::tls::Server>(
468 certs[listen_interface_id],
469 per_listen_interface.app_protocol ==
"HTTP2");
472 auto session = make_server_session(
473 per_listen_interface.app_protocol,
477 per_listen_interface.http_configuration);
479 sessions.insert(std::make_pair(
480 id, std::make_pair(listen_interface_id, std::move(session))));
481 per_listen_interface.open_sessions++;
482 per_listen_interface.peak_sessions = std::max(
483 per_listen_interface.peak_sessions,
484 per_listen_interface.open_sessions);
488 sessions_peak = std::max(sessions_peak, sessions.size());
493 std::lock_guard<ccf::pal::Mutex> guard(lock);
495 auto search = sessions.find(
id);
496 if (search == sessions.end())
501 return search->second.second;
506 bool terminate_after_send,
507 std::vector<uint8_t>&& data)
override
510 if (session ==
nullptr)
512 LOG_DEBUG_FMT(
"Refusing to reply to unknown session {}",
id);
518 session->send_data(std::move(data));
520 if (terminate_after_send)
522 session->close_session();
530 std::lock_guard<ccf::pal::Mutex> guard(lock);
531 LOG_DEBUG_FMT(
"Closing a session inside the enclave: {}",
id);
532 const auto search = sessions.find(
id);
533 if (search != sessions.end())
535 auto it = listening_interfaces.find(search->second.first);
536 if (it != listening_interfaces.end())
538 it->second.open_sessions--;
540 sessions.erase(search);
551 const std::shared_ptr<::tls::Cert>& cert,
552 const std::string& app_protocol =
"HTTP1")
554 std::lock_guard<ccf::pal::Mutex> guard(lock);
555 auto ctx = std::make_unique<::tls::Client>(cert);
556 auto id = get_next_client_id();
558 LOG_DEBUG_FMT(
"Creating a new client session inside the enclave: {}",
id);
563 if (app_protocol ==
"HTTP2")
565 auto session = std::make_shared<::http::HTTP2ClientSession>(
566 id, writer_factory, std::move(ctx));
567 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
568 sessions_peak = std::max(sessions_peak, sessions.size());
571 if (app_protocol ==
"HTTP1")
573 auto session = std::make_shared<::http::HTTPClientSession>(
574 id, writer_factory, std::move(ctx));
575 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
576 sessions_peak = std::max(sessions_peak, sessions.size());
580 throw std::runtime_error(
"unsupported client application protocol");
585 std::lock_guard<ccf::pal::Mutex> guard(lock);
586 auto id = get_next_client_id();
587 auto session = std::make_shared<::http::UnencryptedHTTPClientSession>(
589 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
590 sessions_peak = std::max(sessions_peak, sessions.size());
598 disp, ::tcp::tcp_start, [
this](
const uint8_t* data,
size_t size) {
599 auto [new_tls_id, listen_interface_name] =
600 ringbuffer::read_message<::tcp::tcp_start>(data, size);
601 accept(new_tls_id, listen_interface_name);
605 disp, ::tcp::tcp_inbound, [
this](
const uint8_t* data,
size_t size) {
606 auto id = serialized::peek<ccf::tls::ConnID>(data, size);
609 if (session ==
nullptr)
612 "Ignoring tls_inbound for unknown or refused session: {}",
id);
616 session->handle_incoming_data({data, size});
620 disp, ::tcp::tcp_close, [
this](
const uint8_t* data,
size_t size) {
621 auto [id] = ringbuffer::read_message<::tcp::tcp_close>(data, size);
626 disp, udp::udp_start, [
this](
const uint8_t* data,
size_t size) {
627 auto [new_id, listen_interface_name] =
628 ringbuffer::read_message<udp::udp_start>(data, size);
629 accept(new_id, listen_interface_name,
true);
633 disp, udp::udp_inbound, [
this](
const uint8_t* data,
size_t size) {
634 auto id = serialized::peek<int64_t>(data, size);
636 std::shared_ptr<Session> session;
638 std::lock_guard<ccf::pal::Mutex> guard(lock);
640 auto search = sessions.find(
id);
641 if (search == sessions.end())
644 "Ignoring udp::udp_inbound for unknown or refused session: {}",
649 if (!search->second.second && custom_protocol_subsystem)
655 const auto& conn_id = search->first;
656 const auto& interface_id = search->second.first;
658 auto iit = listening_interfaces.find(interface_id);
659 if (iit == listening_interfaces.end())
662 "Failure to create custom protocol session because of "
663 "unknown interface '{}', ignoring udp::udp_inbound for "
670 const auto&
interface = iit->second;
672 search->second.second =
673 custom_protocol_subsystem->create_session(
674 interface.app_protocol, conn_id,
nullptr);
676 if (!search->second.second)
679 "Failure to create custom protocol session, ignoring "
680 "udp::udp_inbound for session: {}",
685 catch (
const std::exception& ex)
688 "Failure to create custom protocol session: {}", ex.what());
693 session = search->second.second;
696 session->handle_incoming_data({data, size});
Definition forwarder_types.h:14
Definition rpc_sessions.h:44
RPCSessions(ringbuffer::AbstractWriterFactory &writer_factory, std::shared_ptr< RPCMap > rpc_map_)
Definition rpc_sessions.h:185
ccf::SessionMetrics get_session_metrics()
Definition rpc_sessions.h:254
void report_request_payload_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:207
void report_request_header_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:214
std::shared_ptr< ClientSession > create_client(const std::shared_ptr<::tls::Cert > &cert, const std::string &app_protocol="HTTP1")
Definition rpc_sessions.h:550
void set_node_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:289
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_sessions.h:594
void set_network_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:295
void remove_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:528
void update_listening_interface_options(const ccf::NodeInfoNetwork &node_info)
Definition rpc_sessions.h:221
void set_custom_protocol_subsystem(std::shared_ptr< CustomProtocolSubsystem > cpss)
Definition rpc_sessions.h:195
bool reply_async(ccf::tls::ConnID id, bool terminate_after_send, std::vector< uint8_t > &&data) override
Definition rpc_sessions.h:504
std::shared_ptr< ClientSession > create_unencrypted_client()
Definition rpc_sessions.h:583
std::shared_ptr< Session > find_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:491
void report_parsing_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:201
void set_cert(ccf::Authority authority, const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:301
ccf::ApplicationProtocol get_app_protocol_main_interface() const
Definition rpc_sessions.h:275
void accept(ccf::tls::ConnID id, const ListenInterfaceID &listen_interface_id, bool udp=false)
Definition rpc_sessions.h:324
Definition error_reporter.h:8
Definition messaging.h:38
Definition quic_session.h:409
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_outside()=0
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
std::mutex Mutex
Definition locking.h:12
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
Definition app_interface.h:14
std::string ApplicationProtocol
Definition node_info_network.h:29
std::string ListenInterfaceID
Definition rpc_context.h:21
Authority
Definition node_info_network.h:16
@ ready
Definition tls_session.h:19
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
Definition msg_types.h:10
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_info_network.h:32
Definition odata_error.h:58
RpcInterfaces rpc_interfaces
RPC interfaces.
Definition node_info_network.h:151
Definition node_info_network.h:184
Definition session_metrics.h:15
Definition session_metrics.h:13
size_t peak
Definition session_metrics.h:31
std::map< std::string, PerInterface > interfaces
Definition session_metrics.h:32
size_t active
Definition session_metrics.h:30
Definition http_configuration.h:24