25 std::optional<ccf::NodeId> node;
26 std::optional<size_t> msg_size = std::nullopt;
27 std::vector<uint8_t> pending;
29 NodeConnectionBehaviour(
32 std::optional<ccf::NodeId> node = std::nullopt) :
38 bool on_read(
size_t len, uint8_t*& incoming, sockaddr)
override
41 "from node {} received {} bytes",
42 node.value_or(UnassociatedNode),
45 pending.insert(pending.end(), incoming, incoming + len);
47 const uint8_t* data = pending.data();
48 size_t size = pending.size();
49 const auto size_before = size;
53 if (!msg_size.has_value())
55 if (size <
sizeof(uint32_t))
60 msg_size = serialized::read<uint32_t>(data, size);
63 if (size < msg_size.value())
66 "from node {} have {}/{} bytes",
67 node.value_or(UnassociatedNode),
73 const auto size_pre_headers = size;
78 msg_type = serialized::read<ccf::NodeMsgType>(data, size);
80 catch (
const std::exception& e)
83 "Received invalid node-to-node traffic. Unable to read message "
84 "type ({}). Closing connection.",
92 from = serialized::read<ccf::NodeId::Value>(data, size);
94 catch (
const std::exception& e)
97 "Received invalid node-to-node traffic. Unable to read sender "
98 "node ID ({}). Closing connection.",
103 const auto size_post_headers = size;
104 const auto header_size = size_pre_headers - size_post_headers;
105 if (header_size > msg_size.value())
108 "Received invalid node-to-node traffic. Total msg size {} "
109 "doesn't even contain headers (of size {})",
114 const size_t payload_size = msg_size.value() - header_size;
116 if (!node.has_value())
118 associate_incoming(from);
123 "node in: from node {}, size {}, type {}",
135 data += payload_size;
136 size -= payload_size;
140 const auto size_after = size;
141 const auto used = size_before - size_after;
144 pending.erase(pending.begin(), pending.begin() + used);
150 virtual void associate_incoming(
const ccf::NodeId&) {}
153 class NodeIncomingBehaviour :
public NodeConnectionBehaviour
157 std::optional<ccf::NodeId> node_id;
160 NodeConnectionBehaviour(
"Node Incoming", parent),
164 void on_disconnect()
override
167 parent.unassociated_incoming.erase(
id);
169 if (node_id.has_value())
171 parent.remove_connection(node_id.value());
175 void associate_incoming(
const ccf::NodeId& n)
override
179 const auto unassociated = parent.unassociated_incoming.find(
id);
181 unassociated != parent.unassociated_incoming.end(),
182 "Associating node {} with incoming ID {}, but have already forgotten "
183 "the incoming connection",
194 parent.connections[n] = unassociated->second;
195 parent.unassociated_incoming.erase(unassociated);
198 "Node incoming connection ({}) associated with {}",
id, n);
202 class NodeOutgoingBehaviour :
public NodeConnectionBehaviour
206 NodeConnectionBehaviour(
"Node Outgoing", parent, node)
209 void on_bind_failed()
override
212 "Disconnecting outgoing connection with {}: bind failed",
214 parent.remove_connection(node.
value());
217 void on_resolve_failed()
override
220 "Disconnecting outgoing connection with {}: resolve failed",
222 parent.remove_connection(node.
value());
225 void on_connect_failed()
override
228 "Disconnecting outgoing connection with {}: connect failed",
230 parent.remove_connection(node.
value());
233 void on_disconnect()
override
236 "Disconnecting outgoing connection with {}: disconnected",
238 parent.remove_connection(node.
value());
252 void on_accept(
TCP& peer)
override
254 auto id = parent.get_next_id();
256 std::make_unique<NodeIncomingBehaviour>(parent,
id));
257 parent.unassociated_incoming.emplace(
id, peer);
258 LOG_DEBUG_FMT(
"Accepted new incoming node connection ({})",
id);
265 std::unordered_map<ccf::NodeId, std::pair<std::string, std::string>>
268 std::unordered_map<ccf::NodeId, TCP> connections;
270 std::unordered_map<size_t, TCP> unassociated_incoming;
275 std::optional<std::string> client_interface = std::nullopt;
276 std::optional<std::chrono::milliseconds> client_connection_timeout =
286 const std::optional<std::string>& client_interface = std::nullopt,
287 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
290 to_enclave(writer_factory.create_writer_to_inside()),
291 client_interface(client_interface),
292 client_connection_timeout(client_connection_timeout_)
294 listener->
set_behaviour(std::make_unique<NodeServerBehaviour>(*
this));
307 ccf::associate_node_address,
308 [
this](
const uint8_t* data,
size_t size) {
309 auto [node_id, hostname, port] =
310 ringbuffer::read_message<ccf::associate_node_address>(data, size);
312 node_addresses[node_id] = {hostname, port};
317 ccf::close_node_outbound,
318 [
this](
const uint8_t* data,
size_t size) {
320 ringbuffer::read_message<ccf::close_node_outbound>(data, size);
322 remove_connection(node_id);
326 disp, ccf::node_outbound, [
this](
const uint8_t* data,
size_t size) {
328 ccf::NodeId to = serialized::read<ccf::NodeId::Value>(data, size);
330 TCP outbound_connection =
nullptr;
332 const auto connection_it = connections.find(to);
333 if (connection_it == connections.end())
335 const auto address_it = node_addresses.find(to);
336 if (address_it == node_addresses.end())
338 LOG_TRACE_FMT(
"Ignoring node_outbound to unknown node {}", to);
342 const auto& [
host, port] = address_it->second;
343 outbound_connection = create_connection(to,
host, port);
344 if (outbound_connection.
is_null())
347 "Unable to connect to {}, dropping outbound message message",
354 outbound_connection = connection_it->second;
360 auto data_to_send = data;
361 auto size_to_send = size;
365 auto msg_type = serialized::read<ccf::NodeMsgType>(data, size);
366 serialized::read<ccf::NodeId::Value>(data, size);
369 (serialized::read<aft::RaftMsgType>(data, size) ==
374 serialized::overlay<::consensus::AppendEntriesIndex>(data, size);
377 uint32_t frame = (uint32_t)size_to_send;
379 if (ae.idx > ae.prev_idx)
381 std::optional<asynchost::LedgerReadResult> read_result =
384 if (!read_result.has_value())
387 "Unable to send AppendEntries ({}, {}]: Ledger read failed",
392 else if (ae.idx != read_result->end_idx)
397 "Unable to send AppendEntries ({}, {}]: Ledger read returned "
401 read_result->end_idx);
406 const auto& framed_entries = read_result->data;
407 frame += (uint32_t)framed_entries.size();
408 outbound_connection->
write(
sizeof(uint32_t), (uint8_t*)&frame);
409 outbound_connection->
write(size_to_send, data_to_send);
411 outbound_connection->
write(
412 framed_entries.size(), framed_entries.data());
418 outbound_connection->
write(
sizeof(uint32_t), (uint8_t*)&frame);
419 outbound_connection->
write(size_to_send, data_to_send);
423 "send AE to node {} [{}]: {}, {}",
432 uint32_t frame = (uint32_t)size_to_send;
436 outbound_connection->
write(
sizeof(uint32_t), (uint8_t*)&frame);
437 outbound_connection->
write(size_to_send, data_to_send);
443 TCP create_connection(
445 const std::string&
host,
446 const std::string& port)
448 auto s =
TCP(
true, client_connection_timeout);
449 s->set_behaviour(std::make_unique<NodeOutgoingBehaviour>(*
this, node_id));
451 if (!s->connect(
host, port, client_interface))
457 connections.emplace(node_id, s);
459 "Added node connection with {} ({}:{})", node_id,
host, port);
466 if (connections.erase(node) < 1)
468 LOG_DEBUG_FMT(
"Cannot remove node connection {}: does not exist", node);
480 while (unassociated_incoming.find(
id) != unassociated_incoming.end())
NodeConnections(messaging::Dispatcher< ringbuffer::Message > &disp, Ledger &ledger, ringbuffer::AbstractWriterFactory &writer_factory, std::string &host, std::string &port, const std::optional< std::string > &client_interface=std::nullopt, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt)
Definition node_connections.h:280