CCF
Loading...
Searching...
No Matches
rpc_connections.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "../tcp/msg_types.h"
6#include "../udp/msg_types.h"
7#include "tcp.h"
8#include "udp.h"
9
10#include <unordered_map>
11
12namespace
13{
14 template <class T>
15 constexpr bool isTCP()
16 {
17 return std::is_same<T, asynchost::TCP>();
18 }
19
20 template <class T>
21 constexpr bool isUDP()
22 {
23 return std::is_same<T, asynchost::UDP>();
24 }
25
26 template <class T>
27 constexpr const char* getConnTypeName()
28 {
29 if constexpr (isTCP<T>())
30 {
31 return "TCP";
32 }
33 else if constexpr (isUDP<T>())
34 {
35 return "UDP";
36 }
37 else
38 {
39 throw std::runtime_error("Invalid connection type");
40 }
41 }
42}
43
44namespace asynchost
45{
51 {
52 public:
54 using ConnID = int64_t;
55 static_assert(std::is_same<::tcp::ConnID, udp::ConnID>());
56 static_assert(std::is_same<::tcp::ConnID, ConnID>());
57
58 ConnIDGenerator() : next_id(1) {}
59
60 template <class T>
61 ConnID get_next_id(T& sockets)
62 {
63 auto id = next_id++;
64 const auto initial = id;
65
66 if (next_id < 0)
67 next_id = 1;
68
69 while (sockets.find(id) != sockets.end())
70 {
71 id++;
72
73 if (id < 0)
74 id = 1;
75
76 if (id == initial)
77 {
78 throw std::runtime_error(
79 "Exhausted all IDs for host RPC connections");
80 }
81 }
82
83 return id;
84 }
85
86 private:
87 std::atomic<ConnID> next_id;
88 };
89
90 template <class ConnType>
92 {
93 using ConnID = ConnIDGenerator::ConnID;
94
95 class RPCClientBehaviour : public SocketBehaviour<ConnType>
96 {
97 public:
98 RPCConnectionsImpl& parent;
99 ConnID id;
100
101 RPCClientBehaviour(RPCConnectionsImpl& parent, ConnID id) :
102 SocketBehaviour<ConnType>("RPC Client", getConnTypeName<ConnType>()),
103 parent(parent),
104 id(id)
105 {
106 parent.mark_active(id);
107 }
108
109 void on_resolve_failed() override
110 {
111 LOG_DEBUG_FMT("rpc resolve failed {}", id);
112 cleanup();
113 }
114
115 void on_connect_failed() override
116 {
117 LOG_DEBUG_FMT("rpc connect failed {}", id);
118 cleanup();
119 }
120
121 bool on_read(size_t len, uint8_t*& data, sockaddr) override
122 {
123 LOG_DEBUG_FMT("rpc read {}: {}", id, len);
124
125 parent.mark_active(id);
126
128 ::tcp::tcp_inbound,
129 parent.to_enclave,
130 id,
131 serializer::ByteRange{data, len});
132
133 return true;
134 }
135
136 void on_disconnect() override
137 {
138 LOG_DEBUG_FMT("rpc disconnect {}", id);
139 cleanup();
140 }
141
142 void cleanup()
143 {
144 if constexpr (isTCP<ConnType>())
145 {
146 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_close, parent.to_enclave, id);
147 }
148 }
149 };
150
151 class RPCServerBehaviour : public SocketBehaviour<ConnType>
152 {
153 public:
154 RPCConnectionsImpl& parent;
155 ConnID id;
156
157 RPCServerBehaviour(RPCConnectionsImpl& parent, ConnID id) :
158 SocketBehaviour<ConnType>("RPC Client", getConnTypeName<ConnType>()),
159 parent(parent),
160 id(id)
161 {}
162
163 void on_accept(ConnType& peer) override
164 {
165 // UDP connections don't register peers
166 if constexpr (isUDP<ConnType>())
167 {
168 return;
169 }
170
171 auto client_id = parent.get_next_id();
172 peer->set_behaviour(
173 std::make_unique<RPCClientBehaviour>(parent, client_id));
174 parent.sockets.emplace(client_id, peer);
175
176 on_start(client_id);
177 }
178
179 void on_start(int64_t peer_id) override
180 {
181 const auto interface_name = parent.get_interface_listen_name(id);
182
184 "rpc start {} on interface \"{}\" as {}",
185 peer_id,
186 interface_name,
187 this->conn_name);
188
189 if constexpr (isTCP<ConnType>())
190 {
192 ::tcp::tcp_start, parent.to_enclave, peer_id, interface_name);
193 return;
194 }
195
196 if constexpr (isUDP<ConnType>())
197 {
199 udp::udp_start, parent.to_enclave, peer_id, interface_name);
200 return;
201 }
202 }
203
204 bool on_read(size_t len, uint8_t*& data, sockaddr addr) override
205 {
206 // UDP connections don't have clients, it's all done in the server
207 if constexpr (isUDP<ConnType>())
208 {
209 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
210
211 LOG_DEBUG_FMT("rpc udp read into ring buffer {}: {}", id, len);
213 udp::udp_inbound,
214 parent.to_enclave,
215 id,
216 addr_family,
217 addr_data,
218 serializer::ByteRange{data, len});
219 }
220
221 return true;
222 }
223
224 void cleanup()
225 {
226 parent.sockets.erase(id);
227 }
228 };
229
230 std::unordered_map<ConnID, ConnType> sockets;
231 ConnIDGenerator& idGen;
232
233 // Measured in seconds
234 std::unordered_map<ConnID, size_t> idle_times;
235
236 std::optional<std::chrono::milliseconds> client_connection_timeout =
237 std::nullopt;
238
239 std::optional<std::chrono::seconds> idle_connection_timeout = std::nullopt;
240
241 ringbuffer::WriterPtr to_enclave;
242
243 public:
245 ringbuffer::AbstractWriterFactory& writer_factory,
246 ConnIDGenerator& idGen,
247 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
248 std::nullopt,
249 std::optional<std::chrono::seconds> idle_connection_timeout_ =
250 std::nullopt) :
251 idGen(idGen),
252 client_connection_timeout(client_connection_timeout_),
253 idle_connection_timeout(idle_connection_timeout_),
254 to_enclave(writer_factory.create_writer_to_inside())
255 {}
256
257 bool listen(
258 ConnID id, std::string& host, std::string& port, const std::string& name)
259 {
260 if (id == 0)
261 {
262 id = get_next_id();
263 }
264
265 if (sockets.find(id) != sockets.end())
266 {
267 LOG_FAIL_FMT("Cannot listen on id {}: already in use", id);
268 return false;
269 }
270
271 ConnType s;
272 s->set_behaviour(std::make_unique<RPCServerBehaviour>(*this, id));
273
274 if (!s->listen(host, port, name))
275 {
276 return false;
277 }
278
279 host = s->get_host();
280 port = s->get_port();
281
282 sockets.emplace(id, s);
283
284 // UDP connections don't have peers, so we need to register the main
285 // socket TCP connections started via peer, on on_accept behaviour call
286 if constexpr (isUDP<ConnType>())
287 {
288 s->start(id);
289 }
290
291 return true;
292 }
293
294 bool connect(ConnID id, const std::string& host, const std::string& port)
295 {
296 if (id == 0)
297 {
298 id = get_next_id();
299 }
300
301 if (sockets.find(id) != sockets.end())
302 {
303 LOG_FAIL_FMT("Cannot connect on id {}: already in use", id);
304 return false;
305 }
306
307 auto s = ConnType(true, client_connection_timeout);
308 s->set_behaviour(std::make_unique<RPCClientBehaviour>(*this, id));
309
310 if (!s->connect(host, port))
311 {
312 return false;
313 }
314
315 sockets.emplace(id, s);
316 return true;
317 }
318
319 bool write(ConnID id, size_t len, const uint8_t* data, sockaddr addr = {})
320 {
321 auto s = sockets.find(id);
322
323 if (s == sockets.end())
324 {
326 "Received an outbound message for id {} which is not a known "
327 "connection. Ignoring message of {} bytes",
328 id,
329 len);
330 return false;
331 }
332
333 if (s->second.is_null())
334 {
335 return false;
336 }
337
338 mark_active(id);
339
340 return s->second->write(len, data, addr);
341 }
342
343 bool stop(ConnID id)
344 {
345 // Invalidating the TCP socket will result in the handle being closed. No
346 // more messages will be read from or written to the TCP socket.
347 sockets[id] = nullptr;
348
349 RINGBUFFER_WRITE_MESSAGE(::tcp::tcp_close, to_enclave, id);
350
351 return true;
352 }
353
354 bool close(ConnID id)
355 {
356 if (sockets.erase(id) < 1)
357 {
358 LOG_DEBUG_FMT("Cannot close id {}: does not exist", id);
359 return false;
360 }
361
362 // Make sure idle_times is cleaned up here, though in practice it should
363 // have been cleared when an earlier stop() was called
364 idle_times.erase(id);
365
366 return true;
367 }
368
371 {
373 disp, ::tcp::tcp_outbound, [this](const uint8_t* data, size_t size) {
374 auto [id, body] =
375 ringbuffer::read_message<::tcp::tcp_outbound>(data, size);
376
377 ConnID connect_id = (ConnID)id;
378 LOG_DEBUG_FMT("rpc write from enclave {}: {}", connect_id, body.size);
379
380 write(connect_id, body.size, body.data);
381 });
382
384 disp, ::tcp::tcp_connect, [this](const uint8_t* data, size_t size) {
385 auto [id, host, port] =
386 ringbuffer::read_message<::tcp::tcp_connect>(data, size);
387
388 LOG_DEBUG_FMT("rpc connect request from enclave {}", id);
389
390 if (check_enclave_side_id(id))
391 {
392 connect(id, host, port);
393 }
394 else
395 {
397 "rpc session id is not in dedicated from-enclave range ({})", id);
398 }
399 });
400
402 disp, ::tcp::tcp_stop, [this](const uint8_t* data, size_t size) {
403 auto [id, msg] =
404 ringbuffer::read_message<::tcp::tcp_stop>(data, size);
405
406 LOG_DEBUG_FMT("rpc stop from enclave {}, {}", id, msg);
407 stop(id);
408
409 // Immediately stop tracking idle timeout for this ID too
410 idle_times.erase(id);
411 });
412
414 disp, ::tcp::tcp_closed, [this](const uint8_t* data, size_t size) {
415 auto [id] = ringbuffer::read_message<::tcp::tcp_closed>(data, size);
416
417 LOG_DEBUG_FMT("rpc closed from enclave {}", id);
418 close(id);
419 });
420 }
421
424 {
426 disp, udp::udp_outbound, [this](const uint8_t* data, size_t size) {
427 auto [id, addr_family, addr_data, body] =
428 ringbuffer::read_message<udp::udp_outbound>(data, size);
429
430 ConnID connect_id = (ConnID)id;
431 LOG_DEBUG_FMT("rpc write from enclave {}: {}", connect_id, body.size);
432
433 auto addr = udp::sockaddr_decode(addr_family, addr_data);
434 write(connect_id, body.size, body.data, addr);
435 });
436 }
437
438 void mark_active(ConnID id)
439 {
440 idle_times[id] = 0;
441 }
442
443 void on_timer()
444 {
445 if (!idle_connection_timeout.has_value())
446 {
447 return;
448 }
449
450 const size_t max_idle_time = idle_connection_timeout->count();
451
452 auto it = idle_times.begin();
453 while (it != idle_times.end())
454 {
455 auto& [id, idle_time] = *it;
456 if (idle_time > max_idle_time)
457 {
459 "Closing socket {} after {}s idle (max = {}s)",
460 id,
461 idle_time,
462 max_idle_time);
463 stop(id);
464 it = idle_times.erase(it);
465 }
466 else
467 {
468 idle_time += 1;
469 ++it;
470 }
471 }
472 }
473
474 private:
475 ConnID get_next_id()
476 {
477 return idGen.get_next_id(sockets);
478 }
479
480 bool check_enclave_side_id(ConnID id)
481 {
482 return id < 0;
483 }
484
485 std::string get_interface_listen_name(ConnID id)
486 {
487 const auto it = sockets.find(id);
488 if (it == sockets.end())
489 {
491 "Requested interface number {}, has {}", id, sockets.size());
492 throw std::logic_error(fmt::format("No socket with id {}", id));
493 }
494
495 auto listen_name = it->second->get_listen_name();
496 if (!listen_name.has_value())
497 {
498 throw std::logic_error(
499 fmt::format("Interface {} has no listen name", id));
500 }
501
502 return listen_name.value();
503 }
504 };
505
506 template <class ConnType>
508}
Definition rpc_connections.h:51
ConnID get_next_id(T &sockets)
Definition rpc_connections.h:61
ConnIDGenerator()
Definition rpc_connections.h:58
int64_t ConnID
This is the same as ccf::tls::ConnID and udp::ConnID.
Definition rpc_connections.h:54
Definition rpc_connections.h:92
bool listen(ConnID id, std::string &host, std::string &port, const std::string &name)
Definition rpc_connections.h:257
bool close(ConnID id)
Definition rpc_connections.h:354
RPCConnectionsImpl(ringbuffer::AbstractWriterFactory &writer_factory, ConnIDGenerator &idGen, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt, std::optional< std::chrono::seconds > idle_connection_timeout_=std::nullopt)
Definition rpc_connections.h:244
void register_udp_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:422
bool write(ConnID id, size_t len, const uint8_t *data, sockaddr addr={})
Definition rpc_connections.h:319
bool stop(ConnID id)
Definition rpc_connections.h:343
void on_timer()
Definition rpc_connections.h:443
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition rpc_connections.h:369
bool connect(ConnID id, const std::string &host, const std::string &port)
Definition rpc_connections.h:294
void mark_active(ConnID id)
Definition rpc_connections.h:438
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * conn_name
Definition socket.h:23
Definition proxy.h:51
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition serializer.h:27