CCF
Loading...
Searching...
No Matches
node_connections.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ledger.h"
7#include "node/node_types.h"
8#include "tcp.h"
9#include "timer.h"
10
11#include <unordered_map>
12
13namespace asynchost
14{
15 static const auto UnassociatedNode = ccf::NodeId("Unknown");
16
18 {
19 private:
20 class NodeConnectionBehaviour : public SocketBehaviour<TCP>
21 {
22 private:
23 public:
24 NodeConnections& parent;
25 std::optional<ccf::NodeId> node;
26 std::optional<size_t> msg_size = std::nullopt;
27 std::vector<uint8_t> pending;
28
29 NodeConnectionBehaviour(
30 const char* name,
31 NodeConnections& parent,
32 std::optional<ccf::NodeId> node = std::nullopt) :
34 parent(parent),
35 node(std::move(node))
36 {}
37
38 bool on_read(size_t len, uint8_t*& incoming, sockaddr /*unused*/) override
39 {
41 "from node {} received {} bytes",
42 node.value_or(UnassociatedNode),
43 len);
44
45 pending.insert(pending.end(), incoming, incoming + len);
46
47 const uint8_t* data = pending.data();
48 size_t size = pending.size();
49 const auto size_before = size;
50
51 while (true)
52 {
53 if (!msg_size.has_value())
54 {
55 if (size < sizeof(uint32_t))
56 {
57 break;
58 }
59
60 msg_size = serialized::read<uint32_t>(data, size);
61 }
62
63 if (size < msg_size.value())
64 {
66 "from node {} have {}/{} bytes",
67 node.value_or(UnassociatedNode),
68 size,
69 msg_size.value());
70 break;
71 }
72
73 const auto size_pre_headers = size;
74
75 ccf::NodeMsgType msg_type{};
76 try
77 {
78 msg_type = serialized::read<ccf::NodeMsgType>(data, size);
79 }
80 catch (const std::exception& e)
81 {
83 "Received invalid node-to-node traffic. Unable to read message "
84 "type ({}). Closing connection.",
85 e.what());
86 return false;
87 }
88
89 ccf::NodeId from;
90 try
91 {
92 from = serialized::read<ccf::NodeId::Value>(data, size);
93 }
94 catch (const std::exception& e)
95 {
97 "Received invalid node-to-node traffic. Unable to read sender "
98 "node ID ({}). Closing connection.",
99 e.what());
100 return false;
101 }
102
103 const auto size_post_headers = size;
104 const auto header_size = size_pre_headers - size_post_headers;
105 if (header_size > msg_size.value())
106 {
108 "Received invalid node-to-node traffic. Total msg size {} "
109 "doesn't even contain headers (of size {})",
110 msg_size.value(),
111 header_size);
112 return false;
113 }
114 const size_t payload_size = msg_size.value() - header_size;
115
116 if (!node.has_value())
117 {
118 associate_incoming(from);
119 node = from;
120 }
121
123 "node in: from node {}, size {}, type {}",
124 node.value(),
125 msg_size.value(),
126 msg_type);
127
129 ccf::node_inbound,
130 parent.to_enclave,
131 msg_type,
132 from.value(),
133 serializer::ByteRange{data, payload_size});
134
135 data += payload_size;
136 size -= payload_size;
137 msg_size.reset();
138 }
139
140 const auto size_after = size;
141 const auto used = size_before - size_after;
142 if (used > 0)
143 {
144 pending.erase(pending.begin(), pending.begin() + used);
145 }
146
147 return true;
148 }
149
150 virtual void associate_incoming(const ccf::NodeId& /*unused*/) {}
151 };
152
153 class NodeIncomingBehaviour : public NodeConnectionBehaviour
154 {
155 public:
156 size_t id;
157 std::optional<ccf::NodeId> node_id;
158
159 NodeIncomingBehaviour(NodeConnections& parent, size_t id_) :
160 NodeConnectionBehaviour("Node Incoming", parent),
161 id(id_)
162 {}
163
164 void on_disconnect() override
165 {
166 LOG_DEBUG_FMT("Disconnecting incoming connection {}", id);
167 parent.unassociated_incoming.erase(id);
168
169 if (node_id.has_value())
170 {
171 parent.remove_connection(node_id.value());
172 }
173 }
174
175 void associate_incoming(const ccf::NodeId& n) override
176 {
177 node_id = n;
178
179 const auto unassociated = parent.unassociated_incoming.find(id);
181 unassociated != parent.unassociated_incoming.end(),
182 "Associating node {} with incoming ID {}, but have already forgotten "
183 "the incoming connection",
184 n,
185 id);
186
187 // Always prefer this (probably) newer connection. Pathological case is
188 // where both nodes open outgoings to each other at the same time, both
189 // see the corresponding incoming connections and _drop_ their outgoing
190 // connections. Both have a useless incoming connection they think they
191 // can use. Assumption is that they progress at different rates, and one
192 // of them eventually spots the dead connection and opens a new one
193 // which succeeds.
194 parent.connections[n] = unassociated->second;
195 parent.unassociated_incoming.erase(unassociated);
196
198 "Node incoming connection ({}) associated with {}", id, n);
199 }
200 };
201
202 class NodeOutgoingBehaviour : public NodeConnectionBehaviour
203 {
204 public:
205 NodeOutgoingBehaviour(NodeConnections& parent, const ccf::NodeId& node) :
206 NodeConnectionBehaviour("Node Outgoing", parent, node)
207 {}
208
209 void on_bind_failed() override
210 {
212 "Disconnecting outgoing connection with {}: bind failed",
213 *node); // NOLINT(bugprone-unchecked-optional-access)
214 parent.remove_connection(
215 *node); // NOLINT(bugprone-unchecked-optional-access)
216 }
217
218 void on_resolve_failed() override
219 {
221 "Disconnecting outgoing connection with {}: resolve failed",
222 *node); // NOLINT(bugprone-unchecked-optional-access)
223 parent.remove_connection(
224 *node); // NOLINT(bugprone-unchecked-optional-access)
225 }
226
227 void on_connect_failed() override
228 {
230 "Disconnecting outgoing connection with {}: connect failed",
231 *node); // NOLINT(bugprone-unchecked-optional-access)
232 parent.remove_connection(
233 *node); // NOLINT(bugprone-unchecked-optional-access)
234 }
235
236 void on_disconnect() override
237 {
239 "Disconnecting outgoing connection with {}: disconnected",
240 *node); // NOLINT(bugprone-unchecked-optional-access)
241 parent.remove_connection(
242 *node); // NOLINT(bugprone-unchecked-optional-access)
243 }
244 };
245
246 class NodeServerBehaviour : public SocketBehaviour<TCP>
247 {
248 public:
249 NodeConnections& parent;
250
251 NodeServerBehaviour(NodeConnections& parent) :
252 SocketBehaviour<TCP>("Node Server", "TCP"),
253 parent(parent)
254 {}
255
256 void on_accept(TCP& peer) override
257 {
258 auto id = parent.get_next_id();
259 peer->set_behaviour(
260 std::make_unique<NodeIncomingBehaviour>(parent, id));
261 parent.unassociated_incoming.emplace(id, peer);
262 LOG_DEBUG_FMT("Accepted new incoming node connection ({})", id);
263 }
264 };
265
266 Ledger& ledger;
267 TCP listener;
268
269 std::unordered_map<ccf::NodeId, std::pair<std::string, std::string>>
270 node_addresses;
271
272 std::unordered_map<ccf::NodeId, TCP> connections;
273
274 std::unordered_map<size_t, TCP> unassociated_incoming;
275 size_t next_id = 1;
276
277 ringbuffer::WriterPtr to_enclave;
278
279 std::optional<std::string> client_interface = std::nullopt;
280 std::optional<std::chrono::milliseconds> client_connection_timeout =
281 std::nullopt;
282
283 public:
286 Ledger& ledger,
287 ringbuffer::AbstractWriterFactory& writer_factory,
288 std::string& host,
289 std::string& port,
290 const std::optional<std::string>& client_interface = std::nullopt,
291 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
292 std::nullopt) :
293 ledger(ledger),
294 to_enclave(writer_factory.create_writer_to_inside()),
295 client_interface(client_interface),
296 client_connection_timeout(client_connection_timeout_)
297 {
298 listener->set_behaviour(std::make_unique<NodeServerBehaviour>(*this));
299 listener->listen(host, port);
300 host = listener->get_host();
301 port = listener->get_port();
302
304 }
305
308 {
310 disp,
311 ccf::associate_node_address,
312 [this](const uint8_t* data, size_t size) {
313 auto [node_id, hostname, port] =
314 ringbuffer::read_message<ccf::associate_node_address>(data, size);
315
316 node_addresses[node_id] = {hostname, port};
317 });
318
320 disp,
321 ccf::close_node_outbound,
322 [this](const uint8_t* data, size_t size) {
323 auto [node_id] =
324 ringbuffer::read_message<ccf::close_node_outbound>(data, size);
325
326 remove_connection(node_id);
327 });
328
330 disp, ccf::node_outbound, [this](const uint8_t* data, size_t size) {
331 // Read piece-by-piece rather than all at once
332 ccf::NodeId to = serialized::read<ccf::NodeId::Value>(data, size);
333
334 TCP outbound_connection = nullptr;
335 {
336 const auto connection_it = connections.find(to);
337 if (connection_it == connections.end())
338 {
339 const auto address_it = node_addresses.find(to);
340 if (address_it == node_addresses.end())
341 {
342 LOG_TRACE_FMT("Ignoring node_outbound to unknown node {}", to);
343 return;
344 }
345
346 const auto& [host, port] = address_it->second;
347 outbound_connection = create_connection(to, host, port);
348 if (outbound_connection.is_null())
349 {
351 "Unable to connect to {}, dropping outbound message message",
352 to);
353 return;
354 }
355 }
356 else
357 {
358 outbound_connection = connection_it->second;
359 }
360 }
361
362 // Rather than reading and reserialising, use the msg_type and from_id
363 // that are already serialised on the ringbuffer
364 auto data_to_send = data;
365 auto size_to_send = size;
366
367 // If the message is a consensus append entries message, affix the
368 // corresponding ledger entries
369 auto msg_type = serialized::read<ccf::NodeMsgType>(data, size);
370 serialized::read<ccf::NodeId::Value>(data, size); // Ignore from_id
371 if (
373 (serialized::read<aft::RaftMsgType>(data, size) ==
375 {
376 // Parse the indices to be sent to the recipient.
377 const auto& ae =
378 serialized::overlay<::consensus::AppendEntriesIndex>(data, size);
379
380 // Find the total frame size, and write it along with the header.
381 auto frame = static_cast<uint32_t>(size_to_send);
382
383 if (ae.idx > ae.prev_idx)
384 {
385 std::optional<asynchost::LedgerReadResult> read_result =
386 ledger.read_entries(ae.prev_idx + 1, ae.idx);
387
388 if (!read_result.has_value())
389 {
391 "Unable to send AppendEntries ({}, {}]: Ledger read failed",
392 ae.prev_idx,
393 ae.idx);
394 return;
395 }
396
397 if (ae.idx != read_result->end_idx)
398 {
399 // NB: This should never happen since we do not pass a max_size
400 // to read_entries
402 "Unable to send AppendEntries ({}, {}]: Ledger read returned "
403 "entries to {}",
404 ae.prev_idx,
405 ae.idx,
406 read_result->end_idx);
407 return;
408 }
409
410 const auto& framed_entries = read_result->data;
411 frame += static_cast<uint32_t>(framed_entries.size());
412 outbound_connection->write(
413 sizeof(uint32_t), reinterpret_cast<uint8_t*>(&frame));
414 outbound_connection->write(size_to_send, data_to_send);
415
416 outbound_connection->write(
417 framed_entries.size(), framed_entries.data());
418 }
419 else
420 {
421 // Header-only AE
422 outbound_connection->write(
423 sizeof(uint32_t), reinterpret_cast<uint8_t*>(&frame));
424 outbound_connection->write(size_to_send, data_to_send);
425 }
426
428 "send AE to node {} [{}]: {}, {}",
429 to,
430 frame,
431 ae.idx,
432 ae.prev_idx);
433 }
434 else
435 {
436 // Write as framed data to the recipient.
437 auto frame = static_cast<uint32_t>(size_to_send);
438
439 LOG_DEBUG_FMT("node send to {} [{}]", to, frame);
440
441 outbound_connection->write(
442 sizeof(uint32_t), reinterpret_cast<uint8_t*>(&frame));
443 outbound_connection->write(size_to_send, data_to_send);
444 }
445 });
446 }
447
448 private:
449 TCP create_connection(
450 const ccf::NodeId& node_id,
451 const std::string& host,
452 const std::string& port)
453 {
454 auto s = TCP(true, client_connection_timeout);
455 s->set_behaviour(std::make_unique<NodeOutgoingBehaviour>(*this, node_id));
456
457 if (!s->connect(host, port, client_interface))
458 {
459 LOG_FAIL_FMT("Failed to connect to {} on {}:{}", node_id, host, port);
460 return nullptr;
461 }
462
463 connections.emplace(node_id, s);
465 "Added node connection with {} ({}:{})", node_id, host, port);
466
467 return s;
468 }
469
470 bool remove_connection(const ccf::NodeId& node)
471 {
472 if (connections.erase(node) < 1)
473 {
474 LOG_DEBUG_FMT("Cannot remove node connection {}: does not exist", node);
475 return false;
476 }
477
478 LOG_DEBUG_FMT("Removed node connection with {}", node);
479 return true;
480 }
481
482 size_t get_next_id()
483 {
484 auto id = next_id++;
485
486 while (unassociated_incoming.find(id) != unassociated_incoming.end())
487 {
488 id = next_id++;
489 }
490
491 return id;
492 }
493 };
494}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition ledger.h:716
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1332
Definition node_connections.h:18
NodeConnections(messaging::Dispatcher< ringbuffer::Message > &disp, Ledger &ledger, ringbuffer::AbstractWriterFactory &writer_factory, std::string &host, std::string &port, const std::optional< std::string > &client_interface=std::nullopt, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt)
Definition node_connections.h:284
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition node_connections.h:306
Callback service for user-specific behaviour for TCP and UDP connections.
Definition socket.h:20
const char * name
Definition socket.h:22
bool listen(const std::string &host_, const std::string &port_, const std::optional< std::string > &name=std::nullopt)
Definition tcp.h:320
std::string get_port() const
Definition tcp.h:142
std::string get_host() const
Definition tcp.h:137
void set_behaviour(std::unique_ptr< SocketBehaviour< TCP > > b)
Definition tcp.h:132
bool write(size_t len, const uint8_t *data, sockaddr={})
Definition tcp.h:331
bool is_null()
Definition proxy.h:75
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
@ raft_append_entries
Definition raft_types.h:92
Definition after_io.h:8
proxy_ptr< TCPImpl > TCP
Definition tcp.h:20
EntityId< NodeIdFormatter > NodeId
Definition entity_id.h:164
NodeMsgType
Definition node_types.h:21
@ consensus_msg
Definition node_types.h:23
Definition configuration.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Value & value()
Definition entity_id.h:67
Definition serializer.h:27