5#include "../tcp/msg_types.h"
6#include "../udp/msg_types.h"
10#include <unordered_map>
15 constexpr bool isTCP()
17 return std::is_same<T, asynchost::TCP>();
21 constexpr bool isUDP()
23 return std::is_same<T, asynchost::UDP>();
27 constexpr const char* getConnTypeName()
29 if constexpr (isTCP<T>())
33 else if constexpr (isUDP<T>())
39 throw std::runtime_error(
"Invalid connection type");
55 static_assert(std::is_same<::tcp::ConnID, udp::ConnID>());
56 static_assert(std::is_same<::tcp::ConnID, ConnID>());
64 const auto initial = id;
71 while (sockets.find(
id) != sockets.end())
82 throw std::runtime_error(
83 "Exhausted all IDs for host RPC connections");
91 std::atomic<ConnID> next_id;
94 template <
class ConnType>
113 void on_resolve_failed()
override
119 void on_connect_failed()
override
125 bool on_read(
size_t len, uint8_t*& data, sockaddr )
override
140 void on_disconnect()
override
148 if constexpr (isTCP<ConnType>())
167 void on_accept(ConnType& peer)
override
170 if constexpr (isUDP<ConnType>())
175 auto client_id = parent.get_next_id();
177 std::make_unique<RPCClientBehaviour>(parent, client_id));
178 parent.sockets.emplace(client_id, peer);
183 void on_start(int64_t peer_id)
override
185 const auto interface_name = parent.get_interface_listen_name(
id);
188 "rpc start {} on interface \"{}\" as {}",
193 if constexpr (isTCP<ConnType>())
196 ::tcp::tcp_start, parent.to_enclave, peer_id, interface_name);
200 if constexpr (isUDP<ConnType>())
203 udp::udp_start, parent.to_enclave, peer_id, interface_name);
208 bool on_read(
size_t len, uint8_t*& data, sockaddr addr)
override
211 if constexpr (isUDP<ConnType>())
213 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
215 LOG_DEBUG_FMT(
"rpc udp read into ring buffer {}: {}",
id, len);
230 parent.sockets.erase(
id);
234 std::unordered_map<ConnID, ConnType> sockets;
238 std::unordered_map<ConnID, size_t> idle_times;
240 std::optional<std::chrono::milliseconds> client_connection_timeout =
243 std::optional<std::chrono::seconds> idle_connection_timeout = std::nullopt;
251 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
253 std::optional<std::chrono::seconds> idle_connection_timeout_ =
256 client_connection_timeout(client_connection_timeout_),
257 idle_connection_timeout(idle_connection_timeout_),
258 to_enclave(writer_factory.create_writer_to_inside())
262 ConnID
id, std::string&
host, std::string& port,
const std::string& name)
269 if (sockets.find(
id) != sockets.end())
271 LOG_FAIL_FMT(
"Cannot listen on id {}: already in use",
id);
276 s->set_behaviour(std::make_unique<RPCServerBehaviour>(*
this,
id));
278 if (!s->listen(
host, port, name))
283 host = s->get_host();
284 port = s->get_port();
286 sockets.emplace(
id, s);
290 if constexpr (isUDP<ConnType>())
298 bool connect(ConnID
id,
const std::string&
host,
const std::string& port)
305 if (sockets.find(
id) != sockets.end())
307 LOG_FAIL_FMT(
"Cannot connect on id {}: already in use",
id);
311 auto s = ConnType(
true, client_connection_timeout);
312 s->set_behaviour(std::make_unique<RPCClientBehaviour>(*
this,
id));
314 if (!s->connect(
host, port))
319 sockets.emplace(
id, s);
323 bool write(ConnID
id,
size_t len,
const uint8_t* data, sockaddr addr = {})
325 auto s = sockets.find(
id);
327 if (s == sockets.end())
330 "Received an outbound message for id {} which is not a known "
331 "connection. Ignoring message of {} bytes",
337 if (s->second.is_null())
344 return s->second->write(len, data, addr);
351 sockets[id] =
nullptr;
360 if (sockets.erase(
id) < 1)
368 idle_times.erase(
id);
377 disp, ::tcp::tcp_outbound, [
this](
const uint8_t* data,
size_t size) {
379 ringbuffer::read_message<::tcp::tcp_outbound>(data, size);
381 auto connect_id =
static_cast<ConnID
>(id);
382 LOG_DEBUG_FMT(
"rpc write from enclave {}: {}", connect_id, body.size);
384 write(connect_id, body.size, body.data);
388 disp, ::tcp::tcp_connect, [
this](
const uint8_t* data,
size_t size) {
389 auto [id,
host, port] =
390 ringbuffer::read_message<::tcp::tcp_connect>(data, size);
394 if (check_enclave_side_id(
id))
401 "rpc session id is not in dedicated from-enclave range ({})",
id);
406 disp, ::tcp::tcp_stop, [
this](
const uint8_t* data,
size_t size) {
408 ringbuffer::read_message<::tcp::tcp_stop>(data, size);
414 idle_times.erase(
id);
418 disp, ::tcp::tcp_closed, [
this](
const uint8_t* data,
size_t size) {
419 auto [id] = ringbuffer::read_message<::tcp::tcp_closed>(data, size);
430 disp, udp::udp_outbound, [
this](
const uint8_t* data,
size_t size) {
431 auto [id, addr_family, addr_data, body] =
432 ringbuffer::read_message<udp::udp_outbound>(data, size);
434 auto connect_id =
static_cast<ConnID
>(id);
435 LOG_DEBUG_FMT(
"rpc write from enclave {}: {}", connect_id, body.size);
437 auto addr = udp::sockaddr_decode(addr_family, addr_data);
438 write(connect_id, body.size, body.data, addr);
449 if (!idle_connection_timeout.has_value())
454 const size_t max_idle_time = idle_connection_timeout->count();
456 auto it = idle_times.begin();
457 while (it != idle_times.end())
459 auto& [id, idle_time] = *it;
460 if (idle_time > max_idle_time)
463 "Closing socket {} after {}s idle (max = {}s)",
468 it = idle_times.erase(it);
484 bool check_enclave_side_id(ConnID
id)
489 std::string get_interface_listen_name(ConnID
id)
491 const auto it = sockets.find(
id);
492 if (it == sockets.end())
495 "Requested interface number {}, has {}",
id, sockets.size());
496 throw std::logic_error(fmt::format(
"No socket with id {}",
id));
499 auto listen_name = it->second->get_listen_name();
500 if (!listen_name.has_value())
502 throw std::logic_error(
503 fmt::format(
"Interface {} has no listen name",
id));
506 return listen_name.value();
510 template <
class ConnType>
Definition rpc_connections.h:51
ConnID get_next_id(T &sockets)
Definition rpc_connections.h:61
ConnIDGenerator()
Definition rpc_connections.h:58
int64_t ConnID
This is the same as ccf::tls::ConnID and udp::ConnID.
Definition rpc_connections.h:54
Definition rpc_connections.h:96
bool listen(ConnID id, std::string &host, std::string &port, const std::string &name)
Definition rpc_connections.h:261
bool close(ConnID id)
Definition rpc_connections.h:358
RPCConnectionsImpl(ringbuffer::AbstractWriterFactory &writer_factory, ConnIDGenerator &idGen, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt, std::optional< std::chrono::seconds > idle_connection_timeout_=std::nullopt)
Definition rpc_connections.h:248
void register_udp_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:426
bool write(ConnID id, size_t len, const uint8_t *data, sockaddr addr={})
Definition rpc_connections.h:323
bool stop(ConnID id)
Definition rpc_connections.h:347
void on_timer()
Definition rpc_connections.h:447
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:373
bool connect(ConnID id, const std::string &host, const std::string &port)
Definition rpc_connections.h:298
void mark_active(ConnID id)
Definition rpc_connections.h:442
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * conn_name
Definition socket.h:23
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition serializer.h:27