CCF
Loading...
Searching...
No Matches
rpc_sessions.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
7#include "ccf/pal/locking.h"
9#include "ds/serialized.h"
10#include "enclave/session.h"
11#include "forwarder_types.h"
12#include "http/http2_session.h"
13#include "http/http_session.h"
15// NB: This should be HTTP3 including QUIC, but this is
16// ok for now, as we only have an echo service for now
19#include "quic/quic_session.h"
20#include "rpc_handler.h"
21#include "tls/cert.h"
22#include "tls/client.h"
23#include "tls/context.h"
25#include "tls/server.h"
26#include "udp/msg_types.h"
27
28#include <limits>
29#include <map>
30#include <stdexcept>
31#include <unordered_map>
32
33namespace ccf
34{
36
37 static constexpr size_t max_open_sessions_soft_default = 1000;
38 static constexpr size_t max_open_sessions_hard_default = 1010;
39 static const ccf::Endorsement endorsement_default = {
40 ccf::Authority::SERVICE, std::nullopt};
41
42 class RPCSessions : public std::enable_shared_from_this<RPCSessions>,
46 {
47 private:
48 struct ListenInterface
49 {
50 size_t open_sessions;
51 size_t peak_sessions;
52 size_t max_open_sessions_soft;
53 size_t max_open_sessions_hard;
54 ccf::Endorsement endorsement;
55 http::ParserConfiguration http_configuration;
57 ccf::ApplicationProtocol app_protocol;
58 };
59 std::map<ListenInterfaceID, ListenInterface> listening_interfaces;
60
62 ringbuffer::WriterPtr to_host = nullptr;
63 std::shared_ptr<RPCMap> rpc_map;
64 std::unordered_map<ListenInterfaceID, std::shared_ptr<::tls::Cert>> certs;
65 std::shared_ptr<CustomProtocolSubsystem> custom_protocol_subsystem;
66
67 ccf::pal::Mutex lock;
68 std::unordered_map<
70 std::pair<ListenInterfaceID, std::shared_ptr<ccf::Session>>>
71 sessions;
72 size_t sessions_peak = 0;
73
74 // Negative sessions are reserved for those originating from
75 // the enclave via create_client().
76 std::atomic<ccf::tls::ConnID> next_client_session_id = -1;
77
78 template <typename Base>
79 class NoMoreSessionsImpl : public Base
80 {
81 public:
82 template <typename... Ts>
83 NoMoreSessionsImpl(Ts&&... ts) : Base(std::forward<Ts>(ts)...)
84 {}
85
86 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
87 {
88 Base::tls_io->recv_buffered(data.data(), data.size());
89
90 if (Base::tls_io->get_status() == ccf::SessionStatus::ready)
91 {
92 // Send response describing soft session limit
93 Base::send_odata_error_response(ccf::ErrorDetails{
94 HTTP_STATUS_SERVICE_UNAVAILABLE,
95 ccf::errors::SessionCapExhausted,
96 "Service is currently busy and unable to serve new connections"});
97
98 // Close connection
99 Base::tls_io->close();
100 }
101 }
102 };
103
104 ccf::tls::ConnID get_next_client_id()
105 {
106 auto id = next_client_session_id--;
107 const auto initial = id;
108
109 if (next_client_session_id > 0)
110 next_client_session_id = -1;
111
112 while (sessions.find(id) != sessions.end())
113 {
114 id--;
115
116 if (id > 0)
117 id = -1;
118
119 if (id == initial)
120 {
121 throw std::runtime_error(
122 "Exhausted all IDs for enclave client sessions");
123 }
124 }
125
126 return id;
127 }
128
129 ListenInterface& get_interface_from_interface_id(
130 const ccf::ListenInterfaceID& id)
131 {
132 auto it = listening_interfaces.find(id);
133 if (it != listening_interfaces.end())
134 {
135 return it->second;
136 }
137
138 throw std::logic_error(
139 fmt::format("No RPC interface for interface ID {}", id));
140 }
141
142 std::shared_ptr<ccf::Session> make_server_session(
143 const std::string& app_protocol,
145 const ListenInterfaceID& listen_interface_id,
146 std::unique_ptr<tls::Context>&& ctx,
147 const http::ParserConfiguration& parser_configuration)
148 {
149 if (app_protocol == "HTTP2")
150 {
151 return std::make_shared<::http::HTTP2ServerSession>(
152 rpc_map,
153 id,
154 listen_interface_id,
155 writer_factory,
156 std::move(ctx),
157 parser_configuration,
158 shared_from_this(),
159 *this);
160 }
161 else if (app_protocol == "HTTP1")
162 {
163 return std::make_shared<::http::HTTPServerSession>(
164 rpc_map,
165 id,
166 listen_interface_id,
167 writer_factory,
168 std::move(ctx),
169 parser_configuration,
170 shared_from_this());
171 }
172 else if (custom_protocol_subsystem)
173 {
174 return custom_protocol_subsystem->create_session(
175 app_protocol, id, std::move(ctx));
176 }
177 else
178 {
179 throw std::runtime_error(fmt::format(
180 "unknown protocol '{}' and custom protocol subsystem missing",
181 app_protocol));
182 }
183 }
184
185 public:
187 ringbuffer::AbstractWriterFactory& writer_factory,
188 std::shared_ptr<RPCMap> rpc_map_) :
189 writer_factory(writer_factory),
190 rpc_map(rpc_map_),
191 custom_protocol_subsystem(nullptr)
192 {
193 to_host = writer_factory.create_writer_to_outside();
194 }
195
197 std::shared_ptr<CustomProtocolSubsystem> cpss)
198 {
199 custom_protocol_subsystem = cpss;
200 }
201
203 {
204 std::lock_guard<ccf::pal::Mutex> guard(lock);
205 get_interface_from_interface_id(id).errors.parsing++;
206 }
207
209 const ccf::ListenInterfaceID& id) override
210 {
211 std::lock_guard<ccf::pal::Mutex> guard(lock);
212 get_interface_from_interface_id(id).errors.request_payload_too_large++;
213 }
214
216 const ccf::ListenInterfaceID& id) override
217 {
218 std::lock_guard<ccf::pal::Mutex> guard(lock);
219 get_interface_from_interface_id(id).errors.request_header_too_large++;
220 }
221
223 const ccf::NodeInfoNetwork& node_info)
224 {
225 std::lock_guard<ccf::pal::Mutex> guard(lock);
226
227 for (const auto& [name, interface] : node_info.rpc_interfaces)
228 {
229 auto& li = listening_interfaces[name];
230
231 li.max_open_sessions_soft = interface.max_open_sessions_soft.value_or(
232 max_open_sessions_soft_default);
233
234 li.max_open_sessions_hard = interface.max_open_sessions_hard.value_or(
235 max_open_sessions_hard_default);
236
237 li.endorsement = interface.endorsement.value_or(endorsement_default);
238
239 li.http_configuration =
240 interface.http_configuration.value_or(http::ParserConfiguration{});
241
242 li.app_protocol = interface.app_protocol.value_or("HTTP1");
243
245 "Setting max open sessions on interface \"{}\" ({}) to [{}, "
246 "{}] and endorsement authority to {}",
247 name,
248 interface.bind_address,
249 li.max_open_sessions_soft,
250 li.max_open_sessions_hard,
251 li.endorsement.authority);
252 }
253 }
254
256 {
258 std::lock_guard<ccf::pal::Mutex> guard(lock);
259
260 sm.active = sessions.size();
261 sm.peak = sessions_peak;
262
263 for (const auto& [name, interface] : listening_interfaces)
264 {
265 sm.interfaces[name] = {
266 interface.open_sessions,
267 interface.peak_sessions,
268 interface.max_open_sessions_soft,
269 interface.max_open_sessions_hard,
270 interface.errors};
271 }
272
273 return sm;
274 }
275
277 {
278 // Note: this is a temporary function to conveniently find out which
279 // protocol to use when creating client endpoints (e.g. for join
280 // protocol). This can be removed once the HTTP and HTTP/2 endpoints have
281 // been merged.
282 if (listening_interfaces.empty())
283 {
284 throw std::logic_error("No listening interface for this node");
285 }
286
287 return listening_interfaces.begin()->second.app_protocol;
288 }
289
291 const ccf::crypto::Pem& cert_, const ccf::crypto::Pem& pk)
292 {
293 set_cert(ccf::Authority::NODE, cert_, pk);
294 }
295
297 const ccf::crypto::Pem& cert_, const ccf::crypto::Pem& pk)
298 {
300 }
301
303 ccf::Authority authority,
304 const ccf::crypto::Pem& cert_,
305 const ccf::crypto::Pem& pk,
306 const std::string& acme_configuration = "")
307 {
308 // Caller authentication is done by each frontend by looking up
309 // the caller's certificate in the relevant store table. The caller
310 // certificate does not have to be signed by a known CA (nullptr) and
311 // verification is not required here.
312 auto cert = std::make_shared<::tls::Cert>(
313 nullptr, cert_, pk, std::nullopt, /*auth_required ==*/false);
314
315 std::lock_guard<ccf::pal::Mutex> guard(lock);
316
317 for (auto& [listen_interface_id, interface] : listening_interfaces)
318 {
319 if (interface.endorsement.authority == authority)
320 {
321 if (
322 interface.endorsement.authority != Authority::ACME ||
323 (interface.endorsement.acme_configuration &&
324 *interface.endorsement.acme_configuration == acme_configuration))
325 {
326 certs.insert_or_assign(listen_interface_id, cert);
327 }
328 }
329 }
330 }
331
332 void accept(
334 const ListenInterfaceID& listen_interface_id,
335 bool udp = false)
336 {
337 std::lock_guard<ccf::pal::Mutex> guard(lock);
338
339 if (sessions.find(id) != sessions.end())
340 {
341 throw std::logic_error(
342 fmt::format("Duplicate conn ID received inside enclave: {}", id));
343 }
344
345 auto it = listening_interfaces.find(listen_interface_id);
346 if (it == listening_interfaces.end())
347 {
348 throw std::logic_error(fmt::format(
349 "Can't accept new RPC session {} - comes from unknown listening "
350 "interface {}",
351 id,
352 listen_interface_id));
353 }
354
355 auto& per_listen_interface = it->second;
356
357 if (
358 per_listen_interface.endorsement.authority != Authority::UNSECURED &&
359 certs.find(listen_interface_id) == certs.end())
360 {
362 "Refusing TLS session {} inside the enclave - interface {} "
363 "has no TLS certificate yet",
364 id,
365 listen_interface_id);
366
368 ::tcp::tcp_stop, to_host, id, std::string("Session refused"));
369 }
370 else if (
371 per_listen_interface.open_sessions >=
372 per_listen_interface.max_open_sessions_hard)
373 {
375 "Refusing TLS session {} inside the enclave - already have {} "
376 "sessions from interface {} and limit is {}",
377 id,
378 per_listen_interface.open_sessions,
379 listen_interface_id,
380 per_listen_interface.max_open_sessions_hard);
381
383 ::tcp::tcp_stop, to_host, id, std::string("Session refused"));
384 }
385 else if (
386 per_listen_interface.open_sessions >=
387 per_listen_interface.max_open_sessions_soft)
388 {
390 "Soft refusing session {} (returning 503) inside the enclave - "
391 "already have {} sessions from interface {} and limit is {}",
392 id,
393 per_listen_interface.open_sessions,
394 listen_interface_id,
395 per_listen_interface.max_open_sessions_soft);
396
397 auto ctx = std::make_unique<::tls::Server>(certs[listen_interface_id]);
398 std::shared_ptr<Session> capped_session;
399 if (per_listen_interface.app_protocol == "HTTP2")
400 {
401 capped_session =
402 std::make_shared<NoMoreSessionsImpl<::http::HTTP2ServerSession>>(
403 rpc_map,
404 id,
405 listen_interface_id,
406 writer_factory,
407 std::move(ctx),
408 per_listen_interface.http_configuration,
409 shared_from_this(),
410 *this);
411 }
412 else
413 {
414 capped_session =
415 std::make_shared<NoMoreSessionsImpl<::http::HTTPServerSession>>(
416 rpc_map,
417 id,
418 listen_interface_id,
419 writer_factory,
420 std::move(ctx),
421 per_listen_interface.http_configuration,
422 shared_from_this());
423 }
424 sessions.insert(std::make_pair(
425 id, std::make_pair(listen_interface_id, std::move(capped_session))));
426 per_listen_interface.open_sessions++;
427 per_listen_interface.peak_sessions = std::max(
428 per_listen_interface.peak_sessions,
429 per_listen_interface.open_sessions);
430 }
431 else
432 {
434 "Accepting a session {} inside the enclave from interface \"{}\"",
435 id,
436 listen_interface_id);
437
438 if (udp)
439 {
440 LOG_DEBUG_FMT("New UDP endpoint at {}", id);
441 if (per_listen_interface.app_protocol == "QUIC")
442 {
443 auto session = std::make_shared<QUICSessionImpl>(
444 rpc_map, id, listen_interface_id, writer_factory);
445 sessions.insert(std::make_pair(
446 id, std::make_pair(listen_interface_id, std::move(session))));
447 }
448 else if (custom_protocol_subsystem)
449 {
450 // We know it's a custom protocol, but the session creation function
451 // hasn't been registered yet, so we keep a nullptr until the first
452 // udp::udp_inbound message.
453 sessions.insert(
454 std::make_pair(id, std::make_pair(listen_interface_id, nullptr)));
455 }
456 else
457 {
458 throw std::runtime_error(
459 "unknown UDP protocol and custom protocol subsystem missing");
460 }
461 per_listen_interface.open_sessions++;
462 per_listen_interface.peak_sessions = std::max(
463 per_listen_interface.peak_sessions,
464 per_listen_interface.open_sessions);
465 }
466 else
467 {
468 std::unique_ptr<tls::Context> ctx;
469 if (
470 per_listen_interface.endorsement.authority == Authority::UNSECURED)
471 {
472 ctx = std::make_unique<nontls::PlaintextServer>();
473 }
474 else
475 {
476 ctx = std::make_unique<::tls::Server>(
477 certs[listen_interface_id],
478 per_listen_interface.app_protocol == "HTTP2");
479 }
480
481 auto session = make_server_session(
482 per_listen_interface.app_protocol,
483 id,
484 listen_interface_id,
485 std::move(ctx),
486 per_listen_interface.http_configuration);
487
488 sessions.insert(std::make_pair(
489 id, std::make_pair(listen_interface_id, std::move(session))));
490 per_listen_interface.open_sessions++;
491 per_listen_interface.peak_sessions = std::max(
492 per_listen_interface.peak_sessions,
493 per_listen_interface.open_sessions);
494 }
495 }
496
497 sessions_peak = std::max(sessions_peak, sessions.size());
498 }
499
500 std::shared_ptr<Session> find_session(ccf::tls::ConnID id)
501 {
502 std::lock_guard<ccf::pal::Mutex> guard(lock);
503
504 auto search = sessions.find(id);
505 if (search == sessions.end())
506 {
507 return nullptr;
508 }
509
510 return search->second.second;
511 }
512
515 bool terminate_after_send,
516 std::vector<uint8_t>&& data) override
517 {
518 auto session = find_session(id);
519 if (session == nullptr)
520 {
521 LOG_DEBUG_FMT("Refusing to reply to unknown session {}", id);
522 return false;
523 }
524
525 LOG_DEBUG_FMT("Replying to session {}", id);
526
527 session->send_data(data);
528
529 if (terminate_after_send)
530 {
531 session->close_session();
532 }
533
534 return true;
535 }
536
538 {
539 std::lock_guard<ccf::pal::Mutex> guard(lock);
540 LOG_DEBUG_FMT("Closing a session inside the enclave: {}", id);
541 const auto search = sessions.find(id);
542 if (search != sessions.end())
543 {
544 auto it = listening_interfaces.find(search->second.first);
545 if (it != listening_interfaces.end())
546 {
547 it->second.open_sessions--;
548 }
549 sessions.erase(search);
550 }
551 else
552 {
553 // Enclave doesn't know this ID, but host is still talking about it.
554 // Continue with the normal closure flow
555 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_closed, to_host, id);
556 }
557 }
558
559 std::shared_ptr<ClientSession> create_client(
560 const std::shared_ptr<::tls::Cert>& cert,
561 const std::string& app_protocol = "HTTP1")
562 {
563 std::lock_guard<ccf::pal::Mutex> guard(lock);
564 auto ctx = std::make_unique<::tls::Client>(cert);
565 auto id = get_next_client_id();
566
567 LOG_DEBUG_FMT("Creating a new client session inside the enclave: {}", id);
568
569 // There are no limits on outbound client sessions (we do not check any
570 // session caps here). We expect this type of session to be rare and
571 // want it to succeed even when we are busy.
572 if (app_protocol == "HTTP2")
573 {
574 auto session = std::make_shared<::http::HTTP2ClientSession>(
575 id, writer_factory, std::move(ctx));
576 sessions.insert(std::make_pair(id, std::make_pair("", session)));
577 sessions_peak = std::max(sessions_peak, sessions.size());
578 return session;
579 }
580 else if (app_protocol == "HTTP1")
581 {
582 auto session = std::make_shared<::http::HTTPClientSession>(
583 id, writer_factory, std::move(ctx));
584 sessions.insert(std::make_pair(id, std::make_pair("", session)));
585 sessions_peak = std::max(sessions_peak, sessions.size());
586 return session;
587 }
588 else
589 {
590 throw std::runtime_error("unsupported client application protocol");
591 }
592 }
593
594 std::shared_ptr<ClientSession> create_unencrypted_client()
595 {
596 std::lock_guard<ccf::pal::Mutex> guard(lock);
597 auto id = get_next_client_id();
598 auto session = std::make_shared<::http::UnencryptedHTTPClientSession>(
599 id, writer_factory);
600 sessions.insert(std::make_pair(id, std::make_pair("", session)));
601 sessions_peak = std::max(sessions_peak, sessions.size());
602 return session;
603 }
604
607 {
609 disp, ::tcp::tcp_start, [this](const uint8_t* data, size_t size) {
610 auto [new_tls_id, listen_interface_name] =
611 ringbuffer::read_message<::tcp::tcp_start>(data, size);
612 accept(new_tls_id, listen_interface_name);
613 });
614
616 disp, ::tcp::tcp_inbound, [this](const uint8_t* data, size_t size) {
617 auto id = serialized::peek<ccf::tls::ConnID>(data, size);
618
619 auto session = find_session(id);
620 if (session == nullptr)
621 {
623 "Ignoring tls_inbound for unknown or refused session: {}", id);
624 return;
625 }
626
627 session->handle_incoming_data({data, size});
628 });
629
631 disp, ::tcp::tcp_close, [this](const uint8_t* data, size_t size) {
632 auto [id] = ringbuffer::read_message<::tcp::tcp_close>(data, size);
633 remove_session(id);
634 });
635
637 disp, udp::udp_start, [this](const uint8_t* data, size_t size) {
638 auto [new_id, listen_interface_name] =
639 ringbuffer::read_message<udp::udp_start>(data, size);
640 accept(new_id, listen_interface_name, true);
641 });
642
644 disp, udp::udp_inbound, [this](const uint8_t* data, size_t size) {
645 auto id = serialized::peek<int64_t>(data, size);
646
647 std::shared_ptr<Session> session;
648 {
649 std::lock_guard<ccf::pal::Mutex> guard(lock);
650
651 auto search = sessions.find(id);
652 if (search == sessions.end())
653 {
655 "Ignoring udp::udp_inbound for unknown or refused session: {}",
656 id);
657 return;
658 }
659 else if (!search->second.second && custom_protocol_subsystem)
660 {
661 LOG_DEBUG_FMT("Creating custom UDP session {}", id);
662
663 try
664 {
665 const auto& conn_id = search->first;
666 const auto& interface_id = search->second.first;
667
668 auto iit = listening_interfaces.find(interface_id);
669 if (iit == listening_interfaces.end())
670 {
672 "Failure to create custom protocol session because of "
673 "unknown interface '{}', ignoring udp::udp_inbound for "
674 "session: "
675 "{}",
676 interface_id,
677 id);
678 }
679
680 const auto& interface = iit->second;
681
682 search->second.second =
683 custom_protocol_subsystem->create_session(
684 interface.app_protocol, conn_id, nullptr);
685
686 if (!search->second.second)
687 {
689 "Failure to create custom protocol session, ignoring "
690 "udp::udp_inbound for session: {}",
691 id);
692 return;
693 }
694 }
695 catch (const std::exception& ex)
696 {
698 "Failure to create custom protocol session: {}", ex.what());
699 return;
700 }
701 }
702
703 session = search->second.second;
704 }
705
706 session->handle_incoming_data({data, size});
707 });
708 }
709 };
710}
Definition forwarder_types.h:14
Definition rpc_sessions.h:46
RPCSessions(ringbuffer::AbstractWriterFactory &writer_factory, std::shared_ptr< RPCMap > rpc_map_)
Definition rpc_sessions.h:186
ccf::SessionMetrics get_session_metrics()
Definition rpc_sessions.h:255
void report_request_payload_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:208
void report_request_header_too_large_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:215
std::shared_ptr< ClientSession > create_client(const std::shared_ptr<::tls::Cert > &cert, const std::string &app_protocol="HTTP1")
Definition rpc_sessions.h:559
void set_cert(ccf::Authority authority, const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk, const std::string &acme_configuration="")
Definition rpc_sessions.h:302
void set_node_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:290
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_sessions.h:605
void set_network_cert(const ccf::crypto::Pem &cert_, const ccf::crypto::Pem &pk)
Definition rpc_sessions.h:296
void remove_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:537
void update_listening_interface_options(const ccf::NodeInfoNetwork &node_info)
Definition rpc_sessions.h:222
void set_custom_protocol_subsystem(std::shared_ptr< CustomProtocolSubsystem > cpss)
Definition rpc_sessions.h:196
bool reply_async(ccf::tls::ConnID id, bool terminate_after_send, std::vector< uint8_t > &&data) override
Definition rpc_sessions.h:513
std::shared_ptr< ClientSession > create_unencrypted_client()
Definition rpc_sessions.h:594
std::shared_ptr< Session > find_session(ccf::tls::ConnID id)
Definition rpc_sessions.h:500
void report_parsing_error(const ccf::ListenInterfaceID &id) override
Definition rpc_sessions.h:202
ccf::ApplicationProtocol get_app_protocol_main_interface() const
Definition rpc_sessions.h:276
void accept(ccf::tls::ConnID id, const ListenInterfaceID &listen_interface_id, bool udp=false)
Definition rpc_sessions.h:332
Definition pem.h:18
Definition error_reporter.h:8
Definition responder_lookup.h:14
Definition messaging.h:38
Definition quic_session.h:388
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_outside()=0
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
std::mutex Mutex
Definition locking.h:12
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
Definition app_interface.h:14
std::string ApplicationProtocol
Definition node_info_network.h:29
@ ready
Definition tls_session.h:20
std::string ListenInterfaceID
Definition rpc_context.h:21
Authority
Definition node_info_network.h:16
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
Definition msg_types.h:10
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition node_info_network.h:32
Definition odata_error.h:58
RpcInterfaces rpc_interfaces
RPC interfaces.
Definition node_info_network.h:150
Definition node_info_network.h:196
Definition session_metrics.h:15
Definition session_metrics.h:13
size_t peak
Definition session_metrics.h:31
std::map< std::string, PerInterface > interfaces
Definition session_metrics.h:32
size_t active
Definition session_metrics.h:30
Definition http_configuration.h:24