CCF
Loading...
Searching...
No Matches
rpc_sessions.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/pal/locking.h"
9#include "ds/serialized.h"
10#include "enclave/session.h"
11#include "forwarder_types.h"
12#include "http/http2_session.h"
13#include "http/http_session.h"
16#include "rpc_handler.h"
17#include "tls/cert.h"
18#include "tls/client.h"
19#include "tls/context.h"
21#include "tls/server.h"
22#include "udp/msg_types.h"
23
24// NB: This should be HTTP3 including QUIC, but this is
25// ok for now, as we only have an echo service for now
26#include "quic/quic_session.h"
27
28#include <limits>
29#include <map>
30#include <stdexcept>
31#include <unordered_map>
32
33namespace ccf
34{
36
37 static constexpr size_t max_open_sessions_soft_default = 1000;
38 static constexpr size_t max_open_sessions_hard_default = 1010;
39 static const ccf::Endorsement endorsement_default = {ccf::Authority::SERVICE};
40
41 class RPCSessions : public std::enable_shared_from_this<RPCSessions>,
44 {
45 private:
46 struct ListenInterface
47 {
48 size_t open_sessions = 0;
49 size_t peak_sessions = 0;
50 size_t max_open_sessions_soft = 0;
51 size_t max_open_sessions_hard = 0;
52 ccf::Endorsement endorsement{};
53 http::ParserConfiguration http_configuration;
55 ccf::ApplicationProtocol app_protocol;
56 };
57 std::map<ListenInterfaceID, ListenInterface> listening_interfaces;
58
60 ringbuffer::WriterPtr to_host = nullptr;
61 std::shared_ptr<RPCMap> rpc_map;
62 std::unordered_map<ListenInterfaceID, std::shared_ptr<::tls::Cert>> certs;
63 std::shared_ptr<CustomProtocolSubsystem> custom_protocol_subsystem;
64
65 ccf::pal::Mutex lock;
66 std::unordered_map<
68 std::pair<ListenInterfaceID, std::shared_ptr<ccf::Session>>>
69 sessions;
70 size_t sessions_peak = 0;
71
72 // Negative sessions are reserved for those originating from
73 // the enclave via create_client().
74 std::atomic<ccf::tls::ConnID> next_client_session_id = -1;
75
76 template <typename Base>
77 class NoMoreSessionsImpl : public Base
78 {
79 public:
80 template <typename... Ts>
81 NoMoreSessionsImpl(Ts&&... ts) : Base(std::forward<Ts>(ts)...)
82 {}
83
84 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
85 {
86 Base::tls_io->recv_buffered(data.data(), data.size());
87
88 if (Base::tls_io->get_status() == ccf::SessionStatus::ready)
89 {
90 // Send response describing soft session limit
91 Base::send_odata_error_response(ccf::ErrorDetails{
92 HTTP_STATUS_SERVICE_UNAVAILABLE,
93 ccf::errors::SessionCapExhausted,
94 "Service is currently busy and unable to serve new connections"});
95
96 // Close connection
97 Base::tls_io->close();
98 }
99 }
100 };
101
102 ccf::tls::ConnID get_next_client_id()
103 {
104 auto id = next_client_session_id--;
105 const auto initial = id;
106
107 if (next_client_session_id > 0)
108 {
109 next_client_session_id = -1;
110 }
111
112 while (sessions.find(id) != sessions.end())
113 {
114 id--;
115
116 if (id > 0)
117 {
118 id = -1;
119 }
120
121 if (id == initial)
122 {
123 throw std::runtime_error(
124 "Exhausted all IDs for enclave client sessions");
125 }
126 }
127
128 return id;
129 }
130
131 ListenInterface& get_interface_from_interface_id(
132 const ccf::ListenInterfaceID& id)
133 {
134 auto it = listening_interfaces.find(id);
135 if (it != listening_interfaces.end())
136 {
137 return it->second;
138 }
139
140 throw std::logic_error(
141 fmt::format("No RPC interface for interface ID {}", id));
142 }
143
144 std::shared_ptr<ccf::Session> make_server_session(
145 const std::string& app_protocol,
147 const ListenInterfaceID& listen_interface_id,
148 std::unique_ptr<tls::Context>&& ctx,
149 const http::ParserConfiguration& parser_configuration)
150 {
151 if (app_protocol == "HTTP2")
152 {
153 return std::make_shared<::http::HTTP2ServerSession>(
154 rpc_map,
155 id,
156 listen_interface_id,
157 writer_factory,
158 std::move(ctx),
159 parser_configuration,
160 shared_from_this());
161 }
162 if (app_protocol == "HTTP1")
163 {
164 return std::make_shared<::http::HTTPServerSession>(
165 rpc_map,
166 id,
167 listen_interface_id,
168 writer_factory,
169 std::move(ctx),
170 parser_configuration,
171 shared_from_this());
172 }
173 if (custom_protocol_subsystem)
174 {
175 return custom_protocol_subsystem->create_session(
176 app_protocol, id, std::move(ctx));
177 }
178
179 throw std::runtime_error(fmt::format(
180 "unknown protocol '{}' and custom protocol subsystem missing",
181 app_protocol));
182 }
183
184 public:
186 ringbuffer::AbstractWriterFactory& writer_factory,
187 std::shared_ptr<RPCMap> rpc_map_) :
188 writer_factory(writer_factory),
189 rpc_map(std::move(rpc_map_)),
190 custom_protocol_subsystem(nullptr)
191 {
192 to_host = writer_factory.create_writer_to_outside();
193 }
194
196 std::shared_ptr<CustomProtocolSubsystem> cpss)
197 {
198 custom_protocol_subsystem = cpss;
199 }
200
202 {
203 std::lock_guard<ccf::pal::Mutex> guard(lock);
204 get_interface_from_interface_id(id).errors.parsing++;
205 }
206
208 const ccf::ListenInterfaceID& id) override
209 {
210 std::lock_guard<ccf::pal::Mutex> guard(lock);
211 get_interface_from_interface_id(id).errors.request_payload_too_large++;
212 }
213
215 const ccf::ListenInterfaceID& id) override
216 {
217 std::lock_guard<ccf::pal::Mutex> guard(lock);
218 get_interface_from_interface_id(id).errors.request_header_too_large++;
219 }
220
222 const ccf::NodeInfoNetwork& node_info)
223 {
224 std::lock_guard<ccf::pal::Mutex> guard(lock);
225
226 for (const auto& [name, interface] : node_info.rpc_interfaces)
227 {
228 auto& li = listening_interfaces[name];
229
230 li.max_open_sessions_soft = interface.max_open_sessions_soft.value_or(
231 max_open_sessions_soft_default);
232
233 li.max_open_sessions_hard = interface.max_open_sessions_hard.value_or(
234 max_open_sessions_hard_default);
235
236 li.endorsement = interface.endorsement.value_or(endorsement_default);
237
238 li.http_configuration =
239 interface.http_configuration.value_or(http::ParserConfiguration{});
240
241 li.app_protocol = interface.app_protocol.value_or("HTTP1");
242
244 "Setting max open sessions on interface \"{}\" ({}) to [{}, "
245 "{}] and endorsement authority to {}",
246 name,
247 interface.bind_address,
248 li.max_open_sessions_soft,
249 li.max_open_sessions_hard,
250 li.endorsement.authority);
251 }
252 }
253
255 {
257 std::lock_guard<ccf::pal::Mutex> guard(lock);
258
259 sm.active = sessions.size();
260 sm.peak = sessions_peak;
261
262 for (const auto& [name, interface] : listening_interfaces)
263 {
264 sm.interfaces[name] = {
265 interface.open_sessions,
266 interface.peak_sessions,
267 interface.max_open_sessions_soft,
268 interface.max_open_sessions_hard,
269 interface.errors};
270 }
271
272 return sm;
273 }
274
276 {
277 // Note: this is a temporary function to conveniently find out which
278 // protocol to use when creating client endpoints (e.g. for join
279 // protocol). This can be removed once the HTTP and HTTP/2 endpoints have
280 // been merged.
281 if (listening_interfaces.empty())
282 {
283 throw std::logic_error("No listening interface for this node");
284 }
285
286 return listening_interfaces.begin()->second.app_protocol;
287 }
288
290 const ccf::crypto::Pem& cert_, const ccf::crypto::Pem& pk)
291 {
292 set_cert(ccf::Authority::NODE, cert_, pk);
293 }
294
296 const ccf::crypto::Pem& cert_, const ccf::crypto::Pem& pk)
297 {
299 }
300
302 ccf::Authority authority,
303 const ccf::crypto::Pem& cert_,
304 const ccf::crypto::Pem& pk)
305 {
306 // Caller authentication is done by each frontend by looking up
307 // the caller's certificate in the relevant store table. The caller
308 // certificate does not have to be signed by a known CA (nullptr) and
309 // verification is not required here.
310 auto cert = std::make_shared<::tls::Cert>(
311 nullptr, cert_, pk, std::nullopt, /*auth_required ==*/false);
312
313 std::lock_guard<ccf::pal::Mutex> guard(lock);
314
315 for (auto& [listen_interface_id, interface] : listening_interfaces)
316 {
317 if (interface.endorsement.authority == authority)
318 {
319 certs.insert_or_assign(listen_interface_id, cert);
320 }
321 }
322 }
323
324 void accept(
326 const ListenInterfaceID& listen_interface_id,
327 bool udp = false)
328 {
329 std::lock_guard<ccf::pal::Mutex> guard(lock);
330
331 if (sessions.find(id) != sessions.end())
332 {
333 throw std::logic_error(
334 fmt::format("Duplicate conn ID received inside enclave: {}", id));
335 }
336
337 auto it = listening_interfaces.find(listen_interface_id);
338 if (it == listening_interfaces.end())
339 {
340 throw std::logic_error(fmt::format(
341 "Can't accept new RPC session {} - comes from unknown listening "
342 "interface {}",
343 id,
344 listen_interface_id));
345 }
346
347 auto& per_listen_interface = it->second;
348
349 if (
350 per_listen_interface.endorsement.authority != Authority::UNSECURED &&
351 certs.find(listen_interface_id) == certs.end())
352 {
354 "Refusing TLS session {} inside the enclave - interface {} "
355 "has no TLS certificate yet",
356 id,
357 listen_interface_id);
358
360 ::tcp::tcp_stop, to_host, id, std::string("Session refused"));
361 }
362 else if (
363 per_listen_interface.open_sessions >=
364 per_listen_interface.max_open_sessions_hard)
365 {
367 "Refusing TLS session {} inside the enclave - already have {} "
368 "sessions from interface {} and limit is {}",
369 id,
370 per_listen_interface.open_sessions,
371 listen_interface_id,
372 per_listen_interface.max_open_sessions_hard);
373
375 ::tcp::tcp_stop, to_host, id, std::string("Session refused"));
376 }
377 else if (
378 per_listen_interface.open_sessions >=
379 per_listen_interface.max_open_sessions_soft)
380 {
382 "Soft refusing session {} (returning 503) inside the enclave - "
383 "already have {} sessions from interface {} and limit is {}",
384 id,
385 per_listen_interface.open_sessions,
386 listen_interface_id,
387 per_listen_interface.max_open_sessions_soft);
388
389 auto ctx = std::make_unique<::tls::Server>(certs[listen_interface_id]);
390 std::shared_ptr<Session> capped_session;
391 if (per_listen_interface.app_protocol == "HTTP2")
392 {
393 capped_session =
394 std::make_shared<NoMoreSessionsImpl<::http::HTTP2ServerSession>>(
395 rpc_map,
396 id,
397 listen_interface_id,
398 writer_factory,
399 std::move(ctx),
400 per_listen_interface.http_configuration,
401 shared_from_this());
402 }
403 else
404 {
405 capped_session =
406 std::make_shared<NoMoreSessionsImpl<::http::HTTPServerSession>>(
407 rpc_map,
408 id,
409 listen_interface_id,
410 writer_factory,
411 std::move(ctx),
412 per_listen_interface.http_configuration,
413 shared_from_this());
414 }
415 sessions.insert(std::make_pair(
416 id, std::make_pair(listen_interface_id, std::move(capped_session))));
417 per_listen_interface.open_sessions++;
418 per_listen_interface.peak_sessions = std::max(
419 per_listen_interface.peak_sessions,
420 per_listen_interface.open_sessions);
421 }
422 else
423 {
425 "Accepting a session {} inside the enclave from interface \"{}\"",
426 id,
427 listen_interface_id);
428
429 if (udp)
430 {
431 LOG_DEBUG_FMT("New UDP endpoint at {}", id);
432 if (per_listen_interface.app_protocol == "QUIC")
433 {
434 auto session = std::make_shared<QUICSessionImpl>(
435 rpc_map, id, listen_interface_id, writer_factory);
436 sessions.insert(std::make_pair(
437 id, std::make_pair(listen_interface_id, std::move(session))));
438 }
439 else if (custom_protocol_subsystem)
440 {
441 // We know it's a custom protocol, but the session creation function
442 // hasn't been registered yet, so we keep a nullptr until the first
443 // udp::udp_inbound message.
444 sessions.insert(
445 std::make_pair(id, std::make_pair(listen_interface_id, nullptr)));
446 }
447 else
448 {
449 throw std::runtime_error(
450 "unknown UDP protocol and custom protocol subsystem missing");
451 }
452 per_listen_interface.open_sessions++;
453 per_listen_interface.peak_sessions = std::max(
454 per_listen_interface.peak_sessions,
455 per_listen_interface.open_sessions);
456 }
457 else
458 {
459 std::unique_ptr<tls::Context> ctx;
460 if (
461 per_listen_interface.endorsement.authority == Authority::UNSECURED)
462 {
463 ctx = std::make_unique<nontls::PlaintextServer>();
464 }
465 else
466 {
467 ctx = std::make_unique<::tls::Server>(
468 certs[listen_interface_id],
469 per_listen_interface.app_protocol == "HTTP2");
470 }
471
472 auto session = make_server_session(
473 per_listen_interface.app_protocol,
474 id,
475 listen_interface_id,
476 std::move(ctx),
477 per_listen_interface.http_configuration);
478
479 sessions.insert(std::make_pair(
480 id, std::make_pair(listen_interface_id, std::move(session))));
481 per_listen_interface.open_sessions++;
482 per_listen_interface.peak_sessions = std::max(
483 per_listen_interface.peak_sessions,
484 per_listen_interface.open_sessions);
485 }
486 }
487
488 sessions_peak = std::max(sessions_peak, sessions.size());
489 }
490
491 std::shared_ptr<Session> find_session(ccf::tls::ConnID id)
492 {
493 std::lock_guard<ccf::pal::Mutex> guard(lock);
494
495 auto search = sessions.find(id);
496 if (search == sessions.end())
497 {
498 return nullptr;
499 }
500
501 return search->second.second;
502 }
503
506 bool terminate_after_send,
507 std::vector<uint8_t>&& data) override
508 {
509 auto session = find_session(id);
510 if (session == nullptr)
511 {
512 LOG_DEBUG_FMT("Refusing to reply to unknown session {}", id);
513 return false;
514 }
515
516 LOG_DEBUG_FMT("Replying to session {}", id);
517
518 session->send_data(std::move(data));
519
520 if (terminate_after_send)
521 {
522 session->close_session();
523 }
524
525 return true;
526 }
527
529 {
530 std::lock_guard<ccf::pal::Mutex> guard(lock);
531 LOG_DEBUG_FMT("Closing a session inside the enclave: {}", id);
532 const auto search = sessions.find(id);
533 if (search != sessions.end())
534 {
535 auto it = listening_interfaces.find(search->second.first);
536 if (it != listening_interfaces.end())
537 {
538 it->second.open_sessions--;
539 }
540 sessions.erase(search);
541 }
542 else
543 {
544 // Enclave doesn't know this ID, but host is still talking about it.
545 // Continue with the normal closure flow
546 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_closed, to_host, id);
547 }
548 }
549
550 std::shared_ptr<ClientSession> create_client(
551 const std::shared_ptr<::tls::Cert>& cert,
552 const std::string& app_protocol = "HTTP1")
553 {
554 std::lock_guard<ccf::pal::Mutex> guard(lock);
555 auto ctx = std::make_unique<::tls::Client>(cert);
556 auto id = get_next_client_id();
557
558 LOG_DEBUG_FMT("Creating a new client session inside the enclave: {}", id);
559
560 // There are no limits on outbound client sessions (we do not check any
561 // session caps here). We expect this type of session to be rare and
562 // want it to succeed even when we are busy.
563 if (app_protocol == "HTTP2")
564 {
565 auto session = std::make_shared<::http::HTTP2ClientSession>(
566 id, writer_factory, std::move(ctx));
567 sessions.insert(std::make_pair(id, std::make_pair("", session)));
568 sessions_peak = std::max(sessions_peak, sessions.size());
569 return session;
570 }
571 if (app_protocol == "HTTP1")
572 {
573 auto session = std::make_shared<::http::HTTPClientSession>(
574 id, writer_factory, std::move(ctx));
575 sessions.insert(std::make_pair(id, std::make_pair("", session)));
576 sessions_peak = std::max(sessions_peak, sessions.size());
577 return session;
578 }
579
580 throw std::runtime_error("unsupported client application protocol");
581 }
582
583 std::shared_ptr<ClientSession> create_unencrypted_client()
584 {
585 std::lock_guard<ccf::pal::Mutex> guard(lock);
586 auto id = get_next_client_id();
587 auto session = std::make_shared<::http::UnencryptedHTTPClientSession>(
588 id, writer_factory);
589 sessions.insert(std::make_pair(id, std::make_pair("", session)));
590 sessions_peak = std::max(sessions_peak, sessions.size());
591 return session;
592 }
593
596 {
598 disp, ::tcp::tcp_start, [this](const uint8_t* data, size_t size) {
599 auto [new_tls_id, listen_interface_name] =
600 ringbuffer::read_message<::tcp::tcp_start>(data, size);
601 accept(new_tls_id, listen_interface_name);
602 });
603
605 disp, ::tcp::tcp_inbound, [this](const uint8_t* data, size_t size) {
606 auto id = serialized::peek<ccf::tls::ConnID>(data, size);
607
608 auto session = find_session(id);
609 if (session == nullptr)
610 {
612 "Ignoring tls_inbound for unknown or refused session: {}", id);
613 return;
614 }
615
616 session->handle_incoming_data({data, size});
617 });
618
620 disp, ::tcp::tcp_close, [this](const uint8_t* data, size_t size) {
621 auto [id] = ringbuffer::read_message<::tcp::tcp_close>(data, size);
622 remove_session(id);
623 });
624
626 disp, udp::udp_start, [this](const uint8_t* data, size_t size) {
627 auto [new_id, listen_interface_name] =
628 ringbuffer::read_message<udp::udp_start>(data, size);
629 accept(new_id, listen_interface_name, true);
630 });
631
633 disp, udp::udp_inbound, [this](const uint8_t* data, size_t size) {
634 auto id = serialized::peek<int64_t>(data, size);
635
636 std::shared_ptr<Session> session;
637 {
638 std::lock_guard<ccf::pal::Mutex> guard(lock);
639
640 auto search = sessions.find(id);
641 if (search == sessions.end())
642 {
644 "Ignoring udp::udp_inbound for unknown or refused session: {}",
645 id);
646 return;
647 }
648
649 if (!search->second.second && custom_protocol_subsystem)
650 {
651 LOG_DEBUG_FMT("Creating custom UDP session {}", id);
652
653 try
654 {
655 const auto& conn_id = search->first;
656 const auto& interface_id = search->second.first;
657
658 auto iit = listening_interfaces.find(interface_id);
659 if (iit == listening_interfaces.end())
660 {
662 "Failure to create custom protocol session because of "
663 "unknown interface '{}', ignoring udp::udp_inbound for "
664 "session: "
665 "{}",
666 interface_id,
667 id);
668 }
669
670 const auto& interface = iit->second;
671
672 search->second.second =
673 custom_protocol_subsystem->create_session(
674 interface.app_protocol, conn_id, nullptr);
675
676 if (!search->second.second)
677 {
679 "Failure to create custom protocol session, ignoring "
680 "udp::udp_inbound for session: {}",
681 id);
682 return;
683 }
684 }
685 catch (const std::exception& ex)
686 {
688 "Failure to create custom protocol session: {}", ex.what());
689 return;
690 }
691 }
692
693 session = search->second.second;
694 }
695
696 session->handle_incoming_data({data, size});
697 });
698 }
699 };
700}
Definition forwarder_types.h:14
Definition rpc_sessions.h:44
RPCSessions(ringbuffer::AbstractWriterFactory &writer_factory, std::shared_ptr< RPCMap > rpc_map_)
Definition rpc_sessions.h:185
ccf::SessionMetrics get_session_metrics()
Definition rpc_sessions.h:254
void report_request_payload_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:207
void report_request_header_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:214
std::shared_ptr< ClientSession > create_client(const std::shared_ptr<::tls::Cert > &cert, const std::string &app_protocol="HTTP1")
Definition rpc_sessions.h:550
void set_node_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:289
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_sessions.h:594
void set_network_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:295
void remove_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:528
void update_listening_interface_options(const ccf::NodeInfoNetwork &node_info)
Definition rpc_sessions.h:221
void set_custom_protocol_subsystem(std::shared_ptr< CustomProtocolSubsystem > cpss)
Definition rpc_sessions.h:195
bool reply_async(ccf::tls::ConnID id, bool terminate_after_send, std::vector< uint8_t > &&data) override
Definition rpc_sessions.h:504
std::shared_ptr< ClientSession > create_unencrypted_client()
Definition rpc_sessions.h:583
std::shared_ptr< Session > find_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:491
void report_parsing_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:201
void set_cert(ccf::Authority authority, const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:301
ccf::ApplicationProtocol get_app_protocol_main_interface() const
Definition rpc_sessions.h:275
void accept(ccf::tls::ConnID id, const ListenInterfaceID &listen_interface_id, bool udp=false)
Definition rpc_sessions.h:324
Definition pem.h:18
Definition error_reporter.h:8
Definition messaging.h:38
Definition quic_session.h:409
Definition ring_buffer_types.h:157
virtual WriterPtr create_writer_to_outside()=0
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
std::mutex Mutex
Definition locking.h:12
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
Definition app_interface.h:14
std::string ApplicationProtocol
Definition node_info_network.h:29
std::string ListenInterfaceID
Definition rpc_context.h:21
Authority
Definition node_info_network.h:16
@ ready
Definition tls_session.h:19
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
Definition msg_types.h:10
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition node_info_network.h:32
Definition odata_error.h:58
RpcInterfaces rpc_interfaces
RPC interfaces.
Definition node_info_network.h:151
Definition node_info_network.h:184
Definition session_metrics.h:15
Definition session_metrics.h:13
size_t peak
Definition session_metrics.h:31
std::map< std::string, PerInterface > interfaces
Definition session_metrics.h:32
size_t active
Definition session_metrics.h:30
Definition http_configuration.h:24