5#include "../tcp/msg_types.h"
6#include "../udp/msg_types.h"
10#include <unordered_map>
15 constexpr bool isTCP()
17 return std::is_same<T, asynchost::TCP>();
21 constexpr bool isUDP()
23 return std::is_same<T, asynchost::UDP>();
27 constexpr const char* getConnTypeName()
29 if constexpr (isTCP<T>())
33 else if constexpr (isUDP<T>())
39 throw std::runtime_error(
"Invalid connection type");
55 static_assert(std::is_same<::tcp::ConnID, udp::ConnID>());
56 static_assert(std::is_same<::tcp::ConnID, ConnID>());
64 const auto initial = id;
69 while (sockets.find(
id) != sockets.end())
78 throw std::runtime_error(
79 "Exhausted all IDs for host RPC connections");
87 std::atomic<ConnID> next_id;
90 template <
class ConnType>
109 void on_resolve_failed()
override
115 void on_connect_failed()
override
121 bool on_read(
size_t len, uint8_t*& data, sockaddr)
override
136 void on_disconnect()
override
144 if constexpr (isTCP<ConnType>())
163 void on_accept(ConnType& peer)
override
166 if constexpr (isUDP<ConnType>())
171 auto client_id = parent.get_next_id();
173 std::make_unique<RPCClientBehaviour>(parent, client_id));
174 parent.sockets.emplace(client_id, peer);
179 void on_start(int64_t peer_id)
override
181 const auto interface_name = parent.get_interface_listen_name(
id);
184 "rpc start {} on interface \"{}\" as {}",
189 if constexpr (isTCP<ConnType>())
192 ::tcp::tcp_start, parent.to_enclave, peer_id, interface_name);
196 if constexpr (isUDP<ConnType>())
199 udp::udp_start, parent.to_enclave, peer_id, interface_name);
204 bool on_read(
size_t len, uint8_t*& data, sockaddr addr)
override
207 if constexpr (isUDP<ConnType>())
209 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
211 LOG_DEBUG_FMT(
"rpc udp read into ring buffer {}: {}",
id, len);
226 parent.sockets.erase(
id);
230 std::unordered_map<ConnID, ConnType> sockets;
234 std::unordered_map<ConnID, size_t> idle_times;
236 std::optional<std::chrono::milliseconds> client_connection_timeout =
239 std::optional<std::chrono::seconds> idle_connection_timeout = std::nullopt;
247 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
249 std::optional<std::chrono::seconds> idle_connection_timeout_ =
252 client_connection_timeout(client_connection_timeout_),
253 idle_connection_timeout(idle_connection_timeout_),
254 to_enclave(writer_factory.create_writer_to_inside())
258 ConnID
id, std::string&
host, std::string& port,
const std::string& name)
265 if (sockets.find(
id) != sockets.end())
267 LOG_FAIL_FMT(
"Cannot listen on id {}: already in use",
id);
272 s->set_behaviour(std::make_unique<RPCServerBehaviour>(*
this,
id));
274 if (!s->listen(
host, port, name))
279 host = s->get_host();
280 port = s->get_port();
282 sockets.emplace(
id, s);
286 if constexpr (isUDP<ConnType>())
294 bool connect(ConnID
id,
const std::string&
host,
const std::string& port)
301 if (sockets.find(
id) != sockets.end())
303 LOG_FAIL_FMT(
"Cannot connect on id {}: already in use",
id);
307 auto s = ConnType(
true, client_connection_timeout);
308 s->set_behaviour(std::make_unique<RPCClientBehaviour>(*
this,
id));
310 if (!s->connect(
host, port))
315 sockets.emplace(
id, s);
319 bool write(ConnID
id,
size_t len,
const uint8_t* data, sockaddr addr = {})
321 auto s = sockets.find(
id);
323 if (s == sockets.end())
326 "Received an outbound message for id {} which is not a known "
327 "connection. Ignoring message of {} bytes",
333 if (s->second.is_null())
340 return s->second->write(len, data, addr);
347 sockets[id] =
nullptr;
356 if (sockets.erase(
id) < 1)
364 idle_times.erase(
id);
373 disp, ::tcp::tcp_outbound, [
this](
const uint8_t* data,
size_t size) {
375 ringbuffer::read_message<::tcp::tcp_outbound>(data, size);
377 ConnID connect_id = (ConnID)
id;
378 LOG_DEBUG_FMT(
"rpc write from enclave {}: {}", connect_id, body.size);
380 write(connect_id, body.size, body.data);
384 disp, ::tcp::tcp_connect, [
this](
const uint8_t* data,
size_t size) {
385 auto [id,
host, port] =
386 ringbuffer::read_message<::tcp::tcp_connect>(data, size);
390 if (check_enclave_side_id(
id))
397 "rpc session id is not in dedicated from-enclave range ({})",
id);
402 disp, ::tcp::tcp_stop, [
this](
const uint8_t* data,
size_t size) {
404 ringbuffer::read_message<::tcp::tcp_stop>(data, size);
410 idle_times.erase(
id);
414 disp, ::tcp::tcp_closed, [
this](
const uint8_t* data,
size_t size) {
415 auto [id] = ringbuffer::read_message<::tcp::tcp_closed>(data, size);
426 disp, udp::udp_outbound, [
this](
const uint8_t* data,
size_t size) {
427 auto [id, addr_family, addr_data, body] =
428 ringbuffer::read_message<udp::udp_outbound>(data, size);
430 ConnID connect_id = (ConnID)
id;
431 LOG_DEBUG_FMT(
"rpc write from enclave {}: {}", connect_id, body.size);
433 auto addr = udp::sockaddr_decode(addr_family, addr_data);
434 write(connect_id, body.size, body.data, addr);
445 if (!idle_connection_timeout.has_value())
450 const size_t max_idle_time = idle_connection_timeout->count();
452 auto it = idle_times.begin();
453 while (it != idle_times.end())
455 auto& [id, idle_time] = *it;
456 if (idle_time > max_idle_time)
459 "Closing socket {} after {}s idle (max = {}s)",
464 it = idle_times.erase(it);
480 bool check_enclave_side_id(ConnID
id)
485 std::string get_interface_listen_name(ConnID
id)
487 const auto it = sockets.find(
id);
488 if (it == sockets.end())
491 "Requested interface number {}, has {}",
id, sockets.size());
492 throw std::logic_error(fmt::format(
"No socket with id {}",
id));
495 auto listen_name = it->second->get_listen_name();
496 if (!listen_name.has_value())
498 throw std::logic_error(
499 fmt::format(
"Interface {} has no listen name",
id));
502 return listen_name.value();
506 template <
class ConnType>
Definition rpc_connections.h:51
ConnID get_next_id(T &sockets)
Definition rpc_connections.h:61
ConnIDGenerator()
Definition rpc_connections.h:58
int64_t ConnID
This is the same as ccf::tls::ConnID and udp::ConnID.
Definition rpc_connections.h:54
Definition rpc_connections.h:92
bool listen(ConnID id, std::string &host, std::string &port, const std::string &name)
Definition rpc_connections.h:257
bool close(ConnID id)
Definition rpc_connections.h:354
RPCConnectionsImpl(ringbuffer::AbstractWriterFactory &writer_factory, ConnIDGenerator &idGen, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt, std::optional< std::chrono::seconds > idle_connection_timeout_=std::nullopt)
Definition rpc_connections.h:244
void register_udp_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:422
bool write(ConnID id, size_t len, const uint8_t *data, sockaddr addr={})
Definition rpc_connections.h:319
bool stop(ConnID id)
Definition rpc_connections.h:343
void on_timer()
Definition rpc_connections.h:443
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:369
bool connect(ConnID id, const std::string &host, const std::string &port)
Definition rpc_connections.h:294
void mark_active(ConnID id)
Definition rpc_connections.h:438
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * conn_name
Definition socket.h:23
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition serializer.h:27