CCF
Loading...
Searching...
No Matches
node_connections.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ledger.h"
7#include "node/node_types.h"
8#include "tcp.h"
9#include "timer.h"
10
11#include <unordered_map>
12
13namespace asynchost
14{
15 static const auto UnassociatedNode = ccf::NodeId("Unknown");
16
18 {
19 private:
20 class NodeConnectionBehaviour : public SocketBehaviour<TCP>
21 {
22 private:
23 public:
24 NodeConnections& parent;
25 std::optional<ccf::NodeId> node;
26 std::optional<size_t> msg_size = std::nullopt;
27 std::vector<uint8_t> pending;
28
29 NodeConnectionBehaviour(
30 const char* name,
31 NodeConnections& parent,
32 std::optional<ccf::NodeId> node = std::nullopt) :
34 parent(parent),
35 node(node)
36 {}
37
38 bool on_read(size_t len, uint8_t*& incoming, sockaddr) override
39 {
41 "from node {} received {} bytes",
42 node.value_or(UnassociatedNode),
43 len);
44
45 pending.insert(pending.end(), incoming, incoming + len);
46
47 const uint8_t* data = pending.data();
48 size_t size = pending.size();
49 const auto size_before = size;
50
51 while (true)
52 {
53 if (!msg_size.has_value())
54 {
55 if (size < sizeof(uint32_t))
56 {
57 break;
58 }
59
60 msg_size = serialized::read<uint32_t>(data, size);
61 }
62
63 if (size < msg_size.value())
64 {
66 "from node {} have {}/{} bytes",
67 node.value_or(UnassociatedNode),
68 size,
69 msg_size.value());
70 break;
71 }
72
73 const auto size_pre_headers = size;
74
75 ccf::NodeMsgType msg_type;
76 try
77 {
78 msg_type = serialized::read<ccf::NodeMsgType>(data, size);
79 }
80 catch (const std::exception& e)
81 {
83 "Received invalid node-to-node traffic. Unable to read message "
84 "type ({}). Closing connection.",
85 e.what());
86 return false;
87 }
88
89 ccf::NodeId from;
90 try
91 {
92 from = serialized::read<ccf::NodeId::Value>(data, size);
93 }
94 catch (const std::exception& e)
95 {
97 "Received invalid node-to-node traffic. Unable to read sender "
98 "node ID ({}). Closing connection.",
99 e.what());
100 return false;
101 }
102
103 const auto size_post_headers = size;
104 const auto header_size = size_pre_headers - size_post_headers;
105 if (header_size > msg_size.value())
106 {
108 "Received invalid node-to-node traffic. Total msg size {} "
109 "doesn't even contain headers (of size {})",
110 msg_size.value(),
111 header_size);
112 return false;
113 }
114 const size_t payload_size = msg_size.value() - header_size;
115
116 if (!node.has_value())
117 {
118 associate_incoming(from);
119 node = from;
120 }
121
123 "node in: from node {}, size {}, type {}",
124 node.value(),
125 msg_size.value(),
126 msg_type);
127
129 ccf::node_inbound,
130 parent.to_enclave,
131 msg_type,
132 from.value(),
133 serializer::ByteRange{data, payload_size});
134
135 data += payload_size;
136 size -= payload_size;
137 msg_size.reset();
138 }
139
140 const auto size_after = size;
141 const auto used = size_before - size_after;
142 if (used > 0)
143 {
144 pending.erase(pending.begin(), pending.begin() + used);
145 }
146
147 return true;
148 }
149
150 virtual void associate_incoming(const ccf::NodeId&) {}
151 };
152
153 class NodeIncomingBehaviour : public NodeConnectionBehaviour
154 {
155 public:
156 size_t id;
157 std::optional<ccf::NodeId> node_id;
158
159 NodeIncomingBehaviour(NodeConnections& parent, size_t id_) :
160 NodeConnectionBehaviour("Node Incoming", parent),
161 id(id_)
162 {}
163
164 void on_disconnect() override
165 {
166 LOG_DEBUG_FMT("Disconnecting incoming connection {}", id);
167 parent.unassociated_incoming.erase(id);
168
169 if (node_id.has_value())
170 {
171 parent.remove_connection(node_id.value());
172 }
173 }
174
175 void associate_incoming(const ccf::NodeId& n) override
176 {
177 node_id = n;
178
179 const auto unassociated = parent.unassociated_incoming.find(id);
181 unassociated != parent.unassociated_incoming.end(),
182 "Associating node {} with incoming ID {}, but have already forgotten "
183 "the incoming connection",
184 n,
185 id);
186
187 // Always prefer this (probably) newer connection. Pathological case is
188 // where both nodes open outgoings to each other at the same time, both
189 // see the corresponding incoming connections and _drop_ their outgoing
190 // connections. Both have a useless incoming connection they think they
191 // can use. Assumption is that they progress at different rates, and one
192 // of them eventually spots the dead connection and opens a new one
193 // which succeeds.
194 parent.connections[n] = unassociated->second;
195 parent.unassociated_incoming.erase(unassociated);
196
198 "Node incoming connection ({}) associated with {}", id, n);
199 }
200 };
201
202 class NodeOutgoingBehaviour : public NodeConnectionBehaviour
203 {
204 public:
205 NodeOutgoingBehaviour(NodeConnections& parent, const ccf::NodeId& node) :
206 NodeConnectionBehaviour("Node Outgoing", parent, node)
207 {}
208
209 void on_bind_failed() override
210 {
212 "Disconnecting outgoing connection with {}: bind failed",
213 node.value());
214 parent.remove_connection(node.value());
215 }
216
217 void on_resolve_failed() override
218 {
220 "Disconnecting outgoing connection with {}: resolve failed",
221 node.value());
222 parent.remove_connection(node.value());
223 }
224
225 void on_connect_failed() override
226 {
228 "Disconnecting outgoing connection with {}: connect failed",
229 node.value());
230 parent.remove_connection(node.value());
231 }
232
233 void on_disconnect() override
234 {
236 "Disconnecting outgoing connection with {}: disconnected",
237 node.value());
238 parent.remove_connection(node.value());
239 }
240 };
241
242 class NodeServerBehaviour : public SocketBehaviour<TCP>
243 {
244 public:
245 NodeConnections& parent;
246
247 NodeServerBehaviour(NodeConnections& parent) :
248 SocketBehaviour<TCP>("Node Server", "TCP"),
249 parent(parent)
250 {}
251
252 void on_accept(TCP& peer) override
253 {
254 auto id = parent.get_next_id();
255 peer->set_behaviour(
256 std::make_unique<NodeIncomingBehaviour>(parent, id));
257 parent.unassociated_incoming.emplace(id, peer);
258 LOG_DEBUG_FMT("Accepted new incoming node connection ({})", id);
259 }
260 };
261
262 Ledger& ledger;
263 TCP listener;
264
265 std::unordered_map<ccf::NodeId, std::pair<std::string, std::string>>
266 node_addresses;
267
268 std::unordered_map<ccf::NodeId, TCP> connections;
269
270 std::unordered_map<size_t, TCP> unassociated_incoming;
271 size_t next_id = 1;
272
273 ringbuffer::WriterPtr to_enclave;
274
275 std::optional<std::string> client_interface = std::nullopt;
276 std::optional<std::chrono::milliseconds> client_connection_timeout =
277 std::nullopt;
278
279 public:
282 Ledger& ledger,
283 ringbuffer::AbstractWriterFactory& writer_factory,
284 std::string& host,
285 std::string& port,
286 const std::optional<std::string>& client_interface = std::nullopt,
287 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
288 std::nullopt) :
289 ledger(ledger),
290 to_enclave(writer_factory.create_writer_to_inside()),
291 client_interface(client_interface),
292 client_connection_timeout(client_connection_timeout_)
293 {
294 listener->set_behaviour(std::make_unique<NodeServerBehaviour>(*this));
295 listener->listen(host, port);
296 host = listener->get_host();
297 port = listener->get_port();
298
300 }
301
304 {
306 disp,
307 ccf::associate_node_address,
308 [this](const uint8_t* data, size_t size) {
309 auto [node_id, hostname, port] =
310 ringbuffer::read_message<ccf::associate_node_address>(data, size);
311
312 node_addresses[node_id] = {hostname, port};
313 });
314
316 disp,
317 ccf::close_node_outbound,
318 [this](const uint8_t* data, size_t size) {
319 auto [node_id] =
320 ringbuffer::read_message<ccf::close_node_outbound>(data, size);
321
322 remove_connection(node_id);
323 });
324
326 disp, ccf::node_outbound, [this](const uint8_t* data, size_t size) {
327 // Read piece-by-piece rather than all at once
328 ccf::NodeId to = serialized::read<ccf::NodeId::Value>(data, size);
329
330 TCP outbound_connection = nullptr;
331 {
332 const auto connection_it = connections.find(to);
333 if (connection_it == connections.end())
334 {
335 const auto address_it = node_addresses.find(to);
336 if (address_it == node_addresses.end())
337 {
338 LOG_TRACE_FMT("Ignoring node_outbound to unknown node {}", to);
339 return;
340 }
341
342 const auto& [host, port] = address_it->second;
343 outbound_connection = create_connection(to, host, port);
344 if (outbound_connection.is_null())
345 {
347 "Unable to connect to {}, dropping outbound message message",
348 to);
349 return;
350 }
351 }
352 else
353 {
354 outbound_connection = connection_it->second;
355 }
356 }
357
358 // Rather than reading and reserialising, use the msg_type and from_id
359 // that are already serialised on the ringbuffer
360 auto data_to_send = data;
361 auto size_to_send = size;
362
363 // If the message is a consensus append entries message, affix the
364 // corresponding ledger entries
365 auto msg_type = serialized::read<ccf::NodeMsgType>(data, size);
366 serialized::read<ccf::NodeId::Value>(data, size); // Ignore from_id
367 if (
369 (serialized::read<aft::RaftMsgType>(data, size) ==
371 {
372 // Parse the indices to be sent to the recipient.
373 const auto& ae =
374 serialized::overlay<::consensus::AppendEntriesIndex>(data, size);
375
376 // Find the total frame size, and write it along with the header.
377 uint32_t frame = (uint32_t)size_to_send;
378
379 if (ae.idx > ae.prev_idx)
380 {
381 std::optional<asynchost::LedgerReadResult> read_result =
382 ledger.read_entries(ae.prev_idx + 1, ae.idx);
383
384 if (!read_result.has_value())
385 {
387 "Unable to send AppendEntries ({}, {}]: Ledger read failed",
388 ae.prev_idx,
389 ae.idx);
390 return;
391 }
392 else if (ae.idx != read_result->end_idx)
393 {
394 // NB: This should never happen since we do not pass a max_size
395 // to read_entries
397 "Unable to send AppendEntries ({}, {}]: Ledger read returned "
398 "entries to {}",
399 ae.prev_idx,
400 ae.idx,
401 read_result->end_idx);
402 return;
403 }
404 else
405 {
406 const auto& framed_entries = read_result->data;
407 frame += (uint32_t)framed_entries.size();
408 outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame);
409 outbound_connection->write(size_to_send, data_to_send);
410
411 outbound_connection->write(
412 framed_entries.size(), framed_entries.data());
413 }
414 }
415 else
416 {
417 // Header-only AE
418 outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame);
419 outbound_connection->write(size_to_send, data_to_send);
420 }
421
423 "send AE to node {} [{}]: {}, {}",
424 to,
425 frame,
426 ae.idx,
427 ae.prev_idx);
428 }
429 else
430 {
431 // Write as framed data to the recipient.
432 uint32_t frame = (uint32_t)size_to_send;
433
434 LOG_DEBUG_FMT("node send to {} [{}]", to, frame);
435
436 outbound_connection->write(sizeof(uint32_t), (uint8_t*)&frame);
437 outbound_connection->write(size_to_send, data_to_send);
438 }
439 });
440 }
441
442 private:
443 TCP create_connection(
444 const ccf::NodeId& node_id,
445 const std::string& host,
446 const std::string& port)
447 {
448 auto s = TCP(true, client_connection_timeout);
449 s->set_behaviour(std::make_unique<NodeOutgoingBehaviour>(*this, node_id));
450
451 if (!s->connect(host, port, client_interface))
452 {
453 LOG_FAIL_FMT("Failed to connect to {} on {}:{}", node_id, host, port);
454 return nullptr;
455 }
456
457 connections.emplace(node_id, s);
459 "Added node connection with {} ({}:{})", node_id, host, port);
460
461 return s;
462 }
463
464 bool remove_connection(const ccf::NodeId& node)
465 {
466 if (connections.erase(node) < 1)
467 {
468 LOG_DEBUG_FMT("Cannot remove node connection {}: does not exist", node);
469 return false;
470 }
471
472 LOG_DEBUG_FMT("Removed node connection with {}", node);
473 return true;
474 }
475
476 size_t get_next_id()
477 {
478 auto id = next_id++;
479
480 while (unassociated_incoming.find(id) != unassociated_incoming.end())
481 {
482 id = next_id++;
483 }
484
485 return id;
486 }
487 };
488}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition ledger.h:707
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1326
Definition node_connections.h:18
NodeConnections(messaging::Dispatcher< ringbuffer::Message > &disp, Ledger &ledger, ringbuffer::AbstractWriterFactory &writer_factory, std::string &host, std::string &port, const std::optional< std::string > &client_interface=std::nullopt, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt)
Definition node_connections.h:280
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition node_connections.h:302
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * name
Definition socket.h:22
bool write(size_t len, const uint8_t *data, sockaddr addr={})
Definition tcp.h:329
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Definition tcp.h:318
std::string get_port() const
Definition tcp.h:144
std::string get_host() const
Definition tcp.h:139
void set_behaviour(std::unique_ptr< SocketBehaviour< TCP > > b)
Definition tcp.h:134
bool is_null()
Definition proxy.h:74
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_TRACE_FMT
Definition logger.h:356
#define LOG_DEBUG_FMT
Definition logger.h:357
#define LOG_FAIL_FMT
Definition logger.h:363
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
@ raft_append_entries
Definition raft_types.h:99
Definition after_io.h:8
proxy_ptr< TCPImpl > TCP
Definition tcp.h:19
EntityId< NodeIdFormatter > NodeId
Definition entity_id.h:155
NodeMsgType
Definition node_types.h:19
@ consensus_msg
Definition node_types.h:21
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Value & value()
Definition entity_id.h:60
Definition serializer.h:27