25 std::optional<ccf::NodeId> node;
26 std::optional<size_t> msg_size = std::nullopt;
27 std::vector<uint8_t> pending;
29 NodeConnectionBehaviour(
32 std::optional<ccf::NodeId> node = std::nullopt) :
38 bool on_read(
size_t len, uint8_t*& incoming, sockaddr )
override
41 "from node {} received {} bytes",
42 node.value_or(UnassociatedNode),
45 pending.insert(pending.end(), incoming, incoming + len);
47 const uint8_t* data = pending.data();
48 size_t size = pending.size();
49 const auto size_before = size;
53 if (!msg_size.has_value())
55 if (size <
sizeof(uint32_t))
60 msg_size = serialized::read<uint32_t>(data, size);
63 if (size < msg_size.value())
66 "from node {} have {}/{} bytes",
67 node.value_or(UnassociatedNode),
73 const auto size_pre_headers = size;
78 msg_type = serialized::read<ccf::NodeMsgType>(data, size);
80 catch (
const std::exception& e)
83 "Received invalid node-to-node traffic. Unable to read message "
84 "type ({}). Closing connection.",
92 from = serialized::read<ccf::NodeId::Value>(data, size);
94 catch (
const std::exception& e)
97 "Received invalid node-to-node traffic. Unable to read sender "
98 "node ID ({}). Closing connection.",
103 const auto size_post_headers = size;
104 const auto header_size = size_pre_headers - size_post_headers;
105 if (header_size > msg_size.value())
108 "Received invalid node-to-node traffic. Total msg size {} "
109 "doesn't even contain headers (of size {})",
114 const size_t payload_size = msg_size.value() - header_size;
116 if (!node.has_value())
118 associate_incoming(from);
123 "node in: from node {}, size {}, type {}",
135 data += payload_size;
136 size -= payload_size;
140 const auto size_after = size;
141 const auto used = size_before - size_after;
144 pending.erase(pending.begin(), pending.begin() + used);
150 virtual void associate_incoming(
const ccf::NodeId& ) {}
153 class NodeIncomingBehaviour :
public NodeConnectionBehaviour
157 std::optional<ccf::NodeId> node_id;
160 NodeConnectionBehaviour(
"Node Incoming", parent),
164 void on_disconnect()
override
167 parent.unassociated_incoming.erase(
id);
169 if (node_id.has_value())
171 parent.remove_connection(node_id.value());
175 void associate_incoming(
const ccf::NodeId& n)
override
179 const auto unassociated = parent.unassociated_incoming.find(
id);
181 unassociated != parent.unassociated_incoming.end(),
182 "Associating node {} with incoming ID {}, but have already forgotten "
183 "the incoming connection",
194 parent.connections[n] = unassociated->second;
195 parent.unassociated_incoming.erase(unassociated);
198 "Node incoming connection ({}) associated with {}",
id, n);
202 class NodeOutgoingBehaviour :
public NodeConnectionBehaviour
206 NodeConnectionBehaviour(
"Node Outgoing", parent, node)
209 void on_bind_failed()
override
212 "Disconnecting outgoing connection with {}: bind failed",
214 parent.remove_connection(
218 void on_resolve_failed()
override
221 "Disconnecting outgoing connection with {}: resolve failed",
223 parent.remove_connection(
227 void on_connect_failed()
override
230 "Disconnecting outgoing connection with {}: connect failed",
232 parent.remove_connection(
236 void on_disconnect()
override
239 "Disconnecting outgoing connection with {}: disconnected",
241 parent.remove_connection(
256 void on_accept(
TCP& peer)
override
258 auto id = parent.get_next_id();
260 std::make_unique<NodeIncomingBehaviour>(parent,
id));
261 parent.unassociated_incoming.emplace(
id, peer);
262 LOG_DEBUG_FMT(
"Accepted new incoming node connection ({})",
id);
269 std::unordered_map<ccf::NodeId, std::pair<std::string, std::string>>
272 std::unordered_map<ccf::NodeId, TCP> connections;
274 std::unordered_map<size_t, TCP> unassociated_incoming;
279 std::optional<std::string> client_interface = std::nullopt;
280 std::optional<std::chrono::milliseconds> client_connection_timeout =
290 const std::optional<std::string>& client_interface = std::nullopt,
291 std::optional<std::chrono::milliseconds> client_connection_timeout_ =
294 to_enclave(writer_factory.create_writer_to_inside()),
295 client_interface(client_interface),
296 client_connection_timeout(client_connection_timeout_)
298 listener->
set_behaviour(std::make_unique<NodeServerBehaviour>(*
this));
311 ccf::associate_node_address,
312 [
this](
const uint8_t* data,
size_t size) {
313 auto [node_id, hostname, port] =
314 ringbuffer::read_message<ccf::associate_node_address>(data, size);
316 node_addresses[node_id] = {hostname, port};
321 ccf::close_node_outbound,
322 [
this](
const uint8_t* data,
size_t size) {
324 ringbuffer::read_message<ccf::close_node_outbound>(data, size);
326 remove_connection(node_id);
330 disp, ccf::node_outbound, [
this](
const uint8_t* data,
size_t size) {
332 ccf::NodeId to = serialized::read<ccf::NodeId::Value>(data, size);
334 TCP outbound_connection =
nullptr;
336 const auto connection_it = connections.find(to);
337 if (connection_it == connections.end())
339 const auto address_it = node_addresses.find(to);
340 if (address_it == node_addresses.end())
342 LOG_TRACE_FMT(
"Ignoring node_outbound to unknown node {}", to);
346 const auto& [
host, port] = address_it->second;
347 outbound_connection = create_connection(to,
host, port);
348 if (outbound_connection.
is_null())
351 "Unable to connect to {}, dropping outbound message message",
358 outbound_connection = connection_it->second;
364 auto data_to_send = data;
365 auto size_to_send = size;
369 auto msg_type = serialized::read<ccf::NodeMsgType>(data, size);
370 serialized::read<ccf::NodeId::Value>(data, size);
373 (serialized::read<aft::RaftMsgType>(data, size) ==
378 serialized::overlay<::consensus::AppendEntriesIndex>(data, size);
381 auto frame =
static_cast<uint32_t
>(size_to_send);
383 if (ae.idx > ae.prev_idx)
385 std::optional<asynchost::LedgerReadResult> read_result =
388 if (!read_result.has_value())
391 "Unable to send AppendEntries ({}, {}]: Ledger read failed",
397 if (ae.idx != read_result->end_idx)
402 "Unable to send AppendEntries ({}, {}]: Ledger read returned "
406 read_result->end_idx);
410 const auto& framed_entries = read_result->data;
411 frame +=
static_cast<uint32_t
>(framed_entries.size());
412 outbound_connection->
write(
413 sizeof(uint32_t),
reinterpret_cast<uint8_t*
>(&frame));
414 outbound_connection->
write(size_to_send, data_to_send);
416 outbound_connection->
write(
417 framed_entries.size(), framed_entries.data());
422 outbound_connection->
write(
423 sizeof(uint32_t),
reinterpret_cast<uint8_t*
>(&frame));
424 outbound_connection->
write(size_to_send, data_to_send);
428 "send AE to node {} [{}]: {}, {}",
437 auto frame =
static_cast<uint32_t
>(size_to_send);
441 outbound_connection->
write(
442 sizeof(uint32_t),
reinterpret_cast<uint8_t*
>(&frame));
443 outbound_connection->
write(size_to_send, data_to_send);
449 TCP create_connection(
451 const std::string&
host,
452 const std::string& port)
454 auto s =
TCP(
true, client_connection_timeout);
455 s->set_behaviour(std::make_unique<NodeOutgoingBehaviour>(*
this, node_id));
457 if (!s->connect(
host, port, client_interface))
463 connections.emplace(node_id, s);
465 "Added node connection with {} ({}:{})", node_id,
host, port);
472 if (connections.erase(node) < 1)
474 LOG_DEBUG_FMT(
"Cannot remove node connection {}: does not exist", node);
486 while (unassociated_incoming.find(
id) != unassociated_incoming.end())
NodeConnections(messaging::Dispatcher< ringbuffer::Message > &disp, Ledger &ledger, ringbuffer::AbstractWriterFactory &writer_factory, std::string &host, std::string &port, const std::optional< std::string > &client_interface=std::nullopt, std::optional< std::chrono::milliseconds > client_connection_timeout_=std::nullopt)
Definition node_connections.h:284