CCF
Loading...
Searching...
No Matches
rpc_connections.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "../tcp/msg_types.h"
6#include "../udp/msg_types.h"
7#include "tcp.h"
8#include "udp.h"
9
10#include <unordered_map>
11
12namespace // NOLINT(cert-dcl59-cpp)
13{
14 template <class T>
15 constexpr bool isTCP()
16 {
17 return std::is_same<T, asynchost::TCP>();
18 }
19
20 template <class T>
21 constexpr bool isUDP()
22 {
23 return std::is_same<T, asynchost::UDP>();
24 }
25
26 template <class T>
27 constexpr const char* getConnTypeName()
28 {
29 if constexpr (isTCP<T>())
30 {
31 return "TCP";
32 }
33 else if constexpr (isUDP<T>())
34 {
35 return "UDP";
36 }
37 else
38 {
39 throw std::runtime_error("Invalid connection type");
40 }
41 }
42}
43
44namespace asynchost
45{
51 {
52 public:
54 using ConnID = int64_t;
55 static_assert(std::is_same<::tcp::ConnID, udp::ConnID>());
56 static_assert(std::is_same<::tcp::ConnID, ConnID>());
57
58 ConnIDGenerator() : next_id(1) {}
59
60 template <class T>
61 ConnID get_next_id(T& sockets)
62 {
63 auto id = next_id++;
64 const auto initial = id;
65
66 if (next_id < 0)
67 {
68 next_id = 1;
69 }
70
71 while (sockets.find(id) != sockets.end())
72 {
73 id++;
74
75 if (id < 0)
76 {
77 id = 1;
78 }
79
80 if (id == initial)
81 {
82 throw std::runtime_error(
83 "Exhausted all IDs for host RPC connections");
84 }
85 }
86
87 return id;
88 }
89
90 private:
91 std::atomic<ConnID> next_id;
92 };
93
94 template <class ConnType>
96 {
97 using ConnID = ConnIDGenerator::ConnID;
98
99 class RPCClientBehaviour : public SocketBehaviour<ConnType>
100 {
101 public:
102 RPCConnectionsImpl& parent;
103 ConnID id;
104
105 RPCClientBehaviour(RPCConnectionsImpl& parent, ConnID id) :
106 SocketBehaviour<ConnType>("RPC Client", getConnTypeName<ConnType>()),
107 parent(parent),
108 id(id)
109 {
110 parent.mark_active(id);
111 }
112
113 void on_resolve_failed() override
114 {
115 LOG_DEBUG_FMT("rpc resolve failed {}", id);
116 cleanup();
117 }
118
119 void on_connect_failed() override
120 {
121 LOG_DEBUG_FMT("rpc connect failed {}", id);
122 cleanup();
123 }
124
125 bool on_read(size_t len, uint8_t*& data, sockaddr /*unused*/) override
126 {
127 LOG_DEBUG_FMT("rpc read {}: {}", id, len);
128
129 parent.mark_active(id);
130
132 ::tcp::tcp_inbound,
133 parent.to_enclave,
134 id,
135 serializer::ByteRange{data, len});
136
137 return true;
138 }
139
140 void on_disconnect() override
141 {
142 LOG_DEBUG_FMT("rpc disconnect {}", id);
143 cleanup();
144 }
145
146 void cleanup()
147 {
148 if constexpr (isTCP<ConnType>())
149 {
150 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_close, parent.to_enclave, id);
151 }
152 }
153 };
154
155 class RPCServerBehaviour : public SocketBehaviour<ConnType>
156 {
157 public:
158 RPCConnectionsImpl& parent;
159 ConnID id;
160
161 RPCServerBehaviour(RPCConnectionsImpl& parent, ConnID id) :
162 SocketBehaviour<ConnType>("RPC Client", getConnTypeName<ConnType>()),
163 parent(parent),
164 id(id)
165 {}
166
167 void on_accept(ConnType& peer) override
168 {
169 // UDP connections don't register peers
170 if constexpr (isUDP<ConnType>())
171 {
172 return;
173 }
174
175 auto client_id = parent.get_next_id();
176 peer->set_behaviour(
177 std::make_unique<RPCClientBehaviour>(parent, client_id));
178 parent.sockets.emplace(client_id, peer);
179
180 on_start(client_id);
181 }
182
183 void on_start(int64_t peer_id) override
184 {
185 const auto interface_name = parent.get_interface_listen_name(id);
186
188 "rpc start {} on interface \"{}\" as {}",
189 peer_id,
190 interface_name,
191 this->conn_name);
192
193 if constexpr (isTCP<ConnType>())
194 {
196 ::tcp::tcp_start, parent.to_enclave, peer_id, interface_name);
197 return;
198 }
199
200 if constexpr (isUDP<ConnType>())
201 {
203 udp::udp_start, parent.to_enclave, peer_id, interface_name);
204 return;
205 }
206 }
207
208 bool on_read(size_t len, uint8_t*& data, sockaddr addr) override
209 {
210 // UDP connections don't have clients, it's all done in the server
211 if constexpr (isUDP<ConnType>())
212 {
213 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
214
215 LOG_DEBUG_FMT("rpc udp read into ring buffer {}: {}", id, len);
217 udp::udp_inbound,
218 parent.to_enclave,
219 id,
220 addr_family,
221 addr_data,
222 serializer::ByteRange{data, len});
223 }
224
225 return true;
226 }
227
228 void cleanup()
229 {
230 parent.sockets.erase(id);
231 }
232 };
233
234 std::unordered_map<ConnID, ConnType> sockets;
235 ConnIDGenerator& idGen;
236
237 // Measured in seconds
238 std::unordered_map<ConnID, size_t> idle_times;
239
240 std::optional<std::chrono::milliseconds> client_connection_timeout =
241 std::nullopt;
242
243 std::optional<std::chrono::seconds> idle_connection_timeout = std::nullopt;
244
245 ringbuffer::WriterPtr to_enclave;
246
247 public:
249 ringbuffer::AbstractWriterFactory& writer_factory,
250 ConnIDGenerator& idGen,
251 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
252 std::nullopt,
253 std::optional<std::chrono::seconds> idle_connection_timeout_ =
254 std::nullopt) :
255 idGen(idGen),
256 client_connection_timeout(client_connection_timeout_),
257 idle_connection_timeout(idle_connection_timeout_),
258 to_enclave(writer_factory.create_writer_to_inside())
259 {}
260
261 bool listen(
262 ConnID id, std::string& host, std::string& port, const std::string& name)
263 {
264 if (id == 0)
265 {
266 id = get_next_id();
267 }
268
269 if (sockets.find(id) != sockets.end())
270 {
271 LOG_FAIL_FMT("Cannot listen on id {}: already in use", id);
272 return false;
273 }
274
275 ConnType s;
276 s->set_behaviour(std::make_unique<RPCServerBehaviour>(*this, id));
277
278 if (!s->listen(host, port, name))
279 {
280 return false;
281 }
282
283 host = s->get_host();
284 port = s->get_port();
285
286 sockets.emplace(id, s);
287
288 // UDP connections don't have peers, so we need to register the main
289 // socket TCP connections started via peer, on on_accept behaviour call
290 if constexpr (isUDP<ConnType>())
291 {
292 s->start(id);
293 }
294
295 return true;
296 }
297
298 bool connect(ConnID id, const std::string& host, const std::string& port)
299 {
300 if (id == 0)
301 {
302 id = get_next_id();
303 }
304
305 if (sockets.find(id) != sockets.end())
306 {
307 LOG_FAIL_FMT("Cannot connect on id {}: already in use", id);
308 return false;
309 }
310
311 auto s = ConnType(true, client_connection_timeout);
312 s->set_behaviour(std::make_unique<RPCClientBehaviour>(*this, id));
313
314 if (!s->connect(host, port))
315 {
316 return false;
317 }
318
319 sockets.emplace(id, s);
320 return true;
321 }
322
323 bool write(ConnID id, size_t len, const uint8_t* data, sockaddr addr = {})
324 {
325 auto s = sockets.find(id);
326
327 if (s == sockets.end())
328 {
330 "Received an outbound message for id {} which is not a known "
331 "connection. Ignoring message of {} bytes",
332 id,
333 len);
334 return false;
335 }
336
337 if (s->second.is_null())
338 {
339 return false;
340 }
341
342 mark_active(id);
343
344 return s->second->write(len, data, addr);
345 }
346
347 bool stop(ConnID id)
348 {
349 // Invalidating the TCP socket will result in the handle being closed. No
350 // more messages will be read from or written to the TCP socket.
351 sockets[id] = nullptr;
352
353 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_close, to_enclave, id);
354
355 return true;
356 }
357
358 bool close(ConnID id)
359 {
360 if (sockets.erase(id) < 1)
361 {
362 LOG_DEBUG_FMT("Cannot close id {}: does not exist", id);
363 return false;
364 }
365
366 // Make sure idle_times is cleaned up here, though in practice it should
367 // have been cleared when an earlier stop() was called
368 idle_times.erase(id);
369
370 return true;
371 }
372
375 {
377 disp, ::tcp::tcp_outbound, [this](const uint8_t* data, size_t size) {
378 auto [id, body] =
379 ringbuffer::read_message<::tcp::tcp_outbound>(data, size);
380
381 auto connect_id = static_cast<ConnID>(id);
382 LOG_DEBUG_FMT("rpc write from enclave {}: {}", connect_id, body.size);
383
384 write(connect_id, body.size, body.data);
385 });
386
388 disp, ::tcp::tcp_connect, [this](const uint8_t* data, size_t size) {
389 auto [id, host, port] =
390 ringbuffer::read_message<::tcp::tcp_connect>(data, size);
391
392 LOG_DEBUG_FMT("rpc connect request from enclave {}", id);
393
394 if (check_enclave_side_id(id))
395 {
396 connect(id, host, port);
397 }
398 else
399 {
401 "rpc session id is not in dedicated from-enclave range ({})", id);
402 }
403 });
404
406 disp, ::tcp::tcp_stop, [this](const uint8_t* data, size_t size) {
407 auto [id, msg] =
408 ringbuffer::read_message<::tcp::tcp_stop>(data, size);
409
410 LOG_DEBUG_FMT("rpc stop from enclave {}, {}", id, msg);
411 stop(id);
412
413 // Immediately stop tracking idle timeout for this ID too
414 idle_times.erase(id);
415 });
416
418 disp, ::tcp::tcp_closed, [this](const uint8_t* data, size_t size) {
419 auto [id] = ringbuffer::read_message<::tcp::tcp_closed>(data, size);
420
421 LOG_DEBUG_FMT("rpc closed from enclave {}", id);
422 close(id);
423 });
424 }
425
428 {
430 disp, udp::udp_outbound, [this](const uint8_t* data, size_t size) {
431 auto [id, addr_family, addr_data, body] =
432 ringbuffer::read_message<udp::udp_outbound>(data, size);
433
434 auto connect_id = static_cast<ConnID>(id);
435 LOG_DEBUG_FMT("rpc write from enclave {}: {}", connect_id, body.size);
436
437 auto addr = udp::sockaddr_decode(addr_family, addr_data);
438 write(connect_id, body.size, body.data, addr);
439 });
440 }
441
442 void mark_active(ConnID id)
443 {
444 idle_times[id] = 0;
445 }
446
447 void on_timer()
448 {
449 if (!idle_connection_timeout.has_value())
450 {
451 return;
452 }
453
454 const size_t max_idle_time = idle_connection_timeout->count();
455
456 auto it = idle_times.begin();
457 while (it != idle_times.end())
458 {
459 auto& [id, idle_time] = *it;
460 if (idle_time > max_idle_time)
461 {
463 "Closing socket {} after {}s idle (max = {}s)",
464 id,
465 idle_time,
466 max_idle_time);
467 stop(id);
468 it = idle_times.erase(it);
469 }
470 else
471 {
472 idle_time += 1;
473 ++it;
474 }
475 }
476 }
477
478 private:
479 ConnID get_next_id()
480 {
481 return idGen.get_next_id(sockets);
482 }
483
484 bool check_enclave_side_id(ConnID id)
485 {
486 return id < 0;
487 }
488
489 std::string get_interface_listen_name(ConnID id)
490 {
491 const auto it = sockets.find(id);
492 if (it == sockets.end())
493 {
495 "Requested interface number {}, has {}", id, sockets.size());
496 throw std::logic_error(fmt::format("No socket with id {}", id));
497 }
498
499 auto listen_name = it->second->get_listen_name();
500 if (!listen_name.has_value())
501 {
502 throw std::logic_error(
503 fmt::format("Interface {} has no listen name", id));
504 }
505
506 return listen_name.value();
507 }
508 };
509
510 template <class ConnType>
512}
Definition rpc_connections.h:51
ConnID get_next_id(T &sockets)
Definition rpc_connections.h:61
ConnIDGenerator()
Definition rpc_connections.h:58
int64_t ConnID
This is the same as ccf::tls::ConnID and udp::ConnID.
Definition rpc_connections.h:54
Definition rpc_connections.h:96
bool listen(ConnID id, std::string &host, std::string &port, const std::string &name)
Definition rpc_connections.h:261
bool close(ConnID id)
Definition rpc_connections.h:358
RPCConnectionsImpl(ringbuffer::AbstractWriterFactory &writer_factory, ConnIDGenerator &idGen, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt, std::optional< std::chrono::seconds > idle_connection_timeout_=std::nullopt)
Definition rpc_connections.h:248
void register_udp_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:426
bool write(ConnID id, size_t len, const uint8_t *data, sockaddr addr={})
Definition rpc_connections.h:323
bool stop(ConnID id)
Definition rpc_connections.h:347
void on_timer()
Definition rpc_connections.h:447
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:373
bool connect(ConnID id, const std::string &host, const std::string &port)
Definition rpc_connections.h:298
void mark_active(ConnID id)
Definition rpc_connections.h:442
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * conn_name
Definition socket.h:23
Definition proxy.h:51
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition serializer.h:27