31#include <unordered_map>
37 static constexpr size_t max_open_sessions_soft_default = 1000;
38 static constexpr size_t max_open_sessions_hard_default = 1010;
42 class RPCSessions :
public std::enable_shared_from_this<RPCSessions>,
48 struct ListenInterface
52 size_t max_open_sessions_soft;
53 size_t max_open_sessions_hard;
59 std::map<ListenInterfaceID, ListenInterface> listening_interfaces;
63 std::shared_ptr<RPCMap> rpc_map;
64 std::unordered_map<ListenInterfaceID, std::shared_ptr<::tls::Cert>> certs;
65 std::shared_ptr<CustomProtocolSubsystem> custom_protocol_subsystem;
70 std::pair<ListenInterfaceID, std::shared_ptr<ccf::Session>>>
72 size_t sessions_peak = 0;
76 std::atomic<ccf::tls::ConnID> next_client_session_id = -1;
78 template <
typename Base>
79 class NoMoreSessionsImpl :
public Base
82 template <
typename... Ts>
83 NoMoreSessionsImpl(Ts&&... ts) : Base(std::forward<Ts>(ts)...)
86 void handle_incoming_data_thread(std::vector<uint8_t>&& data)
override
88 Base::tls_io->recv_buffered(data.data(), data.size());
94 HTTP_STATUS_SERVICE_UNAVAILABLE,
95 ccf::errors::SessionCapExhausted,
96 "Service is currently busy and unable to serve new connections"});
99 Base::tls_io->close();
106 auto id = next_client_session_id--;
107 const auto initial = id;
109 if (next_client_session_id > 0)
110 next_client_session_id = -1;
112 while (sessions.find(
id) != sessions.end())
121 throw std::runtime_error(
122 "Exhausted all IDs for enclave client sessions");
129 ListenInterface& get_interface_from_interface_id(
132 auto it = listening_interfaces.find(
id);
133 if (it != listening_interfaces.end())
138 throw std::logic_error(
139 fmt::format(
"No RPC interface for interface ID {}",
id));
142 std::shared_ptr<ccf::Session> make_server_session(
143 const std::string& app_protocol,
146 std::unique_ptr<tls::Context>&& ctx,
149 if (app_protocol ==
"HTTP2")
151 return std::make_shared<::http::HTTP2ServerSession>(
157 parser_configuration,
161 else if (app_protocol ==
"HTTP1")
163 return std::make_shared<::http::HTTPServerSession>(
169 parser_configuration,
172 else if (custom_protocol_subsystem)
174 return custom_protocol_subsystem->create_session(
175 app_protocol,
id, std::move(ctx));
179 throw std::runtime_error(fmt::format(
180 "unknown protocol '{}' and custom protocol subsystem missing",
188 std::shared_ptr<RPCMap> rpc_map_) :
189 writer_factory(writer_factory),
191 custom_protocol_subsystem(nullptr)
197 std::shared_ptr<CustomProtocolSubsystem> cpss)
199 custom_protocol_subsystem = cpss;
204 std::lock_guard<ccf::pal::Mutex> guard(lock);
205 get_interface_from_interface_id(
id).errors.parsing++;
211 std::lock_guard<ccf::pal::Mutex> guard(lock);
212 get_interface_from_interface_id(
id).errors.request_payload_too_large++;
218 std::lock_guard<ccf::pal::Mutex> guard(lock);
219 get_interface_from_interface_id(
id).errors.request_header_too_large++;
225 std::lock_guard<ccf::pal::Mutex> guard(lock);
229 auto& li = listening_interfaces[name];
231 li.max_open_sessions_soft = interface.max_open_sessions_soft.value_or(
232 max_open_sessions_soft_default);
234 li.max_open_sessions_hard = interface.max_open_sessions_hard.value_or(
235 max_open_sessions_hard_default);
237 li.endorsement = interface.endorsement.value_or(endorsement_default);
239 li.http_configuration =
242 li.app_protocol = interface.app_protocol.value_or(
"HTTP1");
245 "Setting max open sessions on interface \"{}\" ({}) to [{}, "
246 "{}] and endorsement authority to {}",
248 interface.bind_address,
249 li.max_open_sessions_soft,
250 li.max_open_sessions_hard,
251 li.endorsement.authority);
258 std::lock_guard<ccf::pal::Mutex> guard(lock);
260 sm.
active = sessions.size();
261 sm.
peak = sessions_peak;
263 for (
const auto& [name, interface] : listening_interfaces)
266 interface.open_sessions,
267 interface.peak_sessions,
268 interface.max_open_sessions_soft,
269 interface.max_open_sessions_hard,
282 if (listening_interfaces.empty())
284 throw std::logic_error(
"No listening interface for this node");
287 return listening_interfaces.begin()->second.app_protocol;
306 const std::string& acme_configuration =
"")
312 auto cert = std::make_shared<::tls::Cert>(
313 nullptr, cert_, pk, std::nullopt,
false);
315 std::lock_guard<ccf::pal::Mutex> guard(lock);
317 for (
auto& [listen_interface_id, interface] : listening_interfaces)
319 if (interface.endorsement.authority == authority)
323 (interface.endorsement.acme_configuration &&
324 *interface.endorsement.acme_configuration == acme_configuration))
326 certs.insert_or_assign(listen_interface_id, cert);
337 std::lock_guard<ccf::pal::Mutex> guard(lock);
339 if (sessions.find(
id) != sessions.end())
341 throw std::logic_error(
342 fmt::format(
"Duplicate conn ID received inside enclave: {}",
id));
345 auto it = listening_interfaces.find(listen_interface_id);
346 if (it == listening_interfaces.end())
348 throw std::logic_error(fmt::format(
349 "Can't accept new RPC session {} - comes from unknown listening "
352 listen_interface_id));
355 auto& per_listen_interface = it->second;
359 certs.find(listen_interface_id) == certs.end())
362 "Refusing TLS session {} inside the enclave - interface {} "
363 "has no TLS certificate yet",
365 listen_interface_id);
368 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
371 per_listen_interface.open_sessions >=
372 per_listen_interface.max_open_sessions_hard)
375 "Refusing TLS session {} inside the enclave - already have {} "
376 "sessions from interface {} and limit is {}",
378 per_listen_interface.open_sessions,
380 per_listen_interface.max_open_sessions_hard);
383 ::tcp::tcp_stop, to_host,
id, std::string(
"Session refused"));
386 per_listen_interface.open_sessions >=
387 per_listen_interface.max_open_sessions_soft)
390 "Soft refusing session {} (returning 503) inside the enclave - "
391 "already have {} sessions from interface {} and limit is {}",
393 per_listen_interface.open_sessions,
395 per_listen_interface.max_open_sessions_soft);
397 auto ctx = std::make_unique<::tls::Server>(certs[listen_interface_id]);
398 std::shared_ptr<Session> capped_session;
399 if (per_listen_interface.app_protocol ==
"HTTP2")
402 std::make_shared<NoMoreSessionsImpl<::http::HTTP2ServerSession>>(
408 per_listen_interface.http_configuration,
415 std::make_shared<NoMoreSessionsImpl<::http::HTTPServerSession>>(
421 per_listen_interface.http_configuration,
424 sessions.insert(std::make_pair(
425 id, std::make_pair(listen_interface_id, std::move(capped_session))));
426 per_listen_interface.open_sessions++;
427 per_listen_interface.peak_sessions = std::max(
428 per_listen_interface.peak_sessions,
429 per_listen_interface.open_sessions);
434 "Accepting a session {} inside the enclave from interface \"{}\"",
436 listen_interface_id);
441 if (per_listen_interface.app_protocol ==
"QUIC")
443 auto session = std::make_shared<QUICSessionImpl>(
444 rpc_map,
id, listen_interface_id, writer_factory);
445 sessions.insert(std::make_pair(
446 id, std::make_pair(listen_interface_id, std::move(session))));
448 else if (custom_protocol_subsystem)
454 std::make_pair(
id, std::make_pair(listen_interface_id,
nullptr)));
458 throw std::runtime_error(
459 "unknown UDP protocol and custom protocol subsystem missing");
461 per_listen_interface.open_sessions++;
462 per_listen_interface.peak_sessions = std::max(
463 per_listen_interface.peak_sessions,
464 per_listen_interface.open_sessions);
468 std::unique_ptr<tls::Context> ctx;
472 ctx = std::make_unique<nontls::PlaintextServer>();
476 ctx = std::make_unique<::tls::Server>(
477 certs[listen_interface_id],
478 per_listen_interface.app_protocol ==
"HTTP2");
481 auto session = make_server_session(
482 per_listen_interface.app_protocol,
486 per_listen_interface.http_configuration);
488 sessions.insert(std::make_pair(
489 id, std::make_pair(listen_interface_id, std::move(session))));
490 per_listen_interface.open_sessions++;
491 per_listen_interface.peak_sessions = std::max(
492 per_listen_interface.peak_sessions,
493 per_listen_interface.open_sessions);
497 sessions_peak = std::max(sessions_peak, sessions.size());
502 std::lock_guard<ccf::pal::Mutex> guard(lock);
504 auto search = sessions.find(
id);
505 if (search == sessions.end())
510 return search->second.second;
515 bool terminate_after_send,
516 std::vector<uint8_t>&& data)
override
519 if (session ==
nullptr)
521 LOG_DEBUG_FMT(
"Refusing to reply to unknown session {}",
id);
527 session->send_data(data);
529 if (terminate_after_send)
531 session->close_session();
539 std::lock_guard<ccf::pal::Mutex> guard(lock);
540 LOG_DEBUG_FMT(
"Closing a session inside the enclave: {}",
id);
541 const auto search = sessions.find(
id);
542 if (search != sessions.end())
544 auto it = listening_interfaces.find(search->second.first);
545 if (it != listening_interfaces.end())
547 it->second.open_sessions--;
549 sessions.erase(search);
560 const std::shared_ptr<::tls::Cert>& cert,
561 const std::string& app_protocol =
"HTTP1")
563 std::lock_guard<ccf::pal::Mutex> guard(lock);
564 auto ctx = std::make_unique<::tls::Client>(cert);
565 auto id = get_next_client_id();
567 LOG_DEBUG_FMT(
"Creating a new client session inside the enclave: {}",
id);
572 if (app_protocol ==
"HTTP2")
574 auto session = std::make_shared<::http::HTTP2ClientSession>(
575 id, writer_factory, std::move(ctx));
576 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
577 sessions_peak = std::max(sessions_peak, sessions.size());
580 else if (app_protocol ==
"HTTP1")
582 auto session = std::make_shared<::http::HTTPClientSession>(
583 id, writer_factory, std::move(ctx));
584 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
585 sessions_peak = std::max(sessions_peak, sessions.size());
590 throw std::runtime_error(
"unsupported client application protocol");
596 std::lock_guard<ccf::pal::Mutex> guard(lock);
597 auto id = get_next_client_id();
598 auto session = std::make_shared<::http::UnencryptedHTTPClientSession>(
600 sessions.insert(std::make_pair(
id, std::make_pair(
"", session)));
601 sessions_peak = std::max(sessions_peak, sessions.size());
609 disp, ::tcp::tcp_start, [
this](
const uint8_t* data,
size_t size) {
610 auto [new_tls_id, listen_interface_name] =
611 ringbuffer::read_message<::tcp::tcp_start>(data, size);
612 accept(new_tls_id, listen_interface_name);
616 disp, ::tcp::tcp_inbound, [
this](
const uint8_t* data,
size_t size) {
617 auto id = serialized::peek<ccf::tls::ConnID>(data, size);
620 if (session ==
nullptr)
623 "Ignoring tls_inbound for unknown or refused session: {}",
id);
627 session->handle_incoming_data({data, size});
631 disp, ::tcp::tcp_close, [
this](
const uint8_t* data,
size_t size) {
632 auto [id] = ringbuffer::read_message<::tcp::tcp_close>(data, size);
637 disp, udp::udp_start, [
this](
const uint8_t* data,
size_t size) {
638 auto [new_id, listen_interface_name] =
639 ringbuffer::read_message<udp::udp_start>(data, size);
640 accept(new_id, listen_interface_name,
true);
644 disp, udp::udp_inbound, [
this](
const uint8_t* data,
size_t size) {
645 auto id = serialized::peek<int64_t>(data, size);
647 std::shared_ptr<Session> session;
649 std::lock_guard<ccf::pal::Mutex> guard(lock);
651 auto search = sessions.find(
id);
652 if (search == sessions.end())
655 "Ignoring udp::udp_inbound for unknown or refused session: {}",
659 else if (!search->second.second && custom_protocol_subsystem)
665 const auto& conn_id = search->first;
666 const auto& interface_id = search->second.first;
668 auto iit = listening_interfaces.find(interface_id);
669 if (iit == listening_interfaces.end())
672 "Failure to create custom protocol session because of "
673 "unknown interface '{}', ignoring udp::udp_inbound for "
680 const auto&
interface = iit->second;
682 search->second.second =
683 custom_protocol_subsystem->create_session(
684 interface.app_protocol, conn_id,
nullptr);
686 if (!search->second.second)
689 "Failure to create custom protocol session, ignoring "
690 "udp::udp_inbound for session: {}",
695 catch (
const std::exception& ex)
698 "Failure to create custom protocol session: {}", ex.what());
703 session = search->second.second;
706 session->handle_incoming_data({data, size});
Definition forwarder_types.h:14
Definition rpc_sessions.h:46
RPCSessions(ringbuffer::AbstractWriterFactory &writer_factory, std::shared_ptr< RPCMap > rpc_map_)
Definition rpc_sessions.h:186
ccf::SessionMetrics get_session_metrics()
Definition rpc_sessions.h:255
void report_request_payload_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:208
void report_request_header_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:215
std::shared_ptr< ClientSession > create_client(const std::shared_ptr<::tls::Cert > &cert, const std::string &app_protocol="HTTP1")
Definition rpc_sessions.h:559
void set_cert(ccf::Authority authority, const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk, const std::string &acme_configuration="")
Definition rpc_sessions.h:302
void set_node_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:290
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_sessions.h:605
void set_network_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:296
void remove_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:537
void update_listening_interface_options(const ccf::NodeInfoNetwork &node_info)
Definition rpc_sessions.h:222
void set_custom_protocol_subsystem(std::shared_ptr< CustomProtocolSubsystem > cpss)
Definition rpc_sessions.h:196
bool reply_async(ccf::tls::ConnID id, bool terminate_after_send, std::vector< uint8_t > &&data) override
Definition rpc_sessions.h:513
std::shared_ptr< ClientSession > create_unencrypted_client()
Definition rpc_sessions.h:594
std::shared_ptr< Session > find_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:500
void report_parsing_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:202
ccf::ApplicationProtocol get_app_protocol_main_interface() const
Definition rpc_sessions.h:276
void accept(ccf::tls::ConnID id, const ListenInterfaceID &listen_interface_id, bool udp=false)
Definition rpc_sessions.h:332
Definition error_reporter.h:8
Definition responder_lookup.h:14
Definition messaging.h:38
Definition quic_session.h:388
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_outside()=0
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
std::mutex Mutex
Definition locking.h:12
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
Definition app_interface.h:14
std::string ApplicationProtocol
Definition node_info_network.h:29
@ ready
Definition tls_session.h:20
std::string ListenInterfaceID
Definition rpc_context.h:21
Authority
Definition node_info_network.h:16
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
Definition msg_types.h:10
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition node_info_network.h:32
Definition odata_error.h:58
RpcInterfaces rpc_interfaces
RPC interfaces.
Definition node_info_network.h:150
Definition node_info_network.h:196
Definition session_metrics.h:15
Definition session_metrics.h:13
size_t peak
Definition session_metrics.h:31
std::map< std::string, PerInterface > interfaces
Definition session_metrics.h:32
size_t active
Definition session_metrics.h:30
Definition http_configuration.h:24