20 std::shared_ptr<Channel> channel;
21 std::chrono::milliseconds idle_time;
24 std::unordered_map<NodeId, ChannelInfo> channels;
32 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
34 std::unique_ptr<ThisNode> this_node;
39 std::optional<size_t> message_limit =
40#ifdef OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT
41 OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT;
48 std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;
50 std::shared_ptr<Channel> get_channel(
const NodeId& peer_id)
53 this_node ==
nullptr || this_node->node_id != peer_id,
54 "Requested channel with self {}",
57 if (!message_limit.has_value())
59 throw std::runtime_error(
60 "Node-to-node message limit has not yet been set");
63 std::lock_guard<ccf::pal::Mutex> guard(lock);
65 auto search = channels.find(peer_id);
66 if (search != channels.end())
68 auto& channel_info = search->second;
69 channel_info.idle_time = std::chrono::milliseconds(0);
70 return channel_info.channel;
73 if (this_node ==
nullptr)
75 throw std::runtime_error(
76 "Endorsed node certificate has not yet been set");
79 auto& endorsed_node_cert = this_node->endorsed_node_cert;
80 if (!endorsed_node_cert.has_value())
82 throw std::runtime_error(
83 "Cannot create node-to-node channel without endorsed node "
88 auto channel = std::make_shared<Channel>(
90 this_node->service_cert,
95 message_limit.
value());
96 auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
97 channels.try_emplace(peer_id, info);
104 writer_factory(writer_factory_),
105 to_host(writer_factory_.create_writer_to_outside())
112 const std::optional<ccf::crypto::Pem>& node_cert)
override
115 this_node ==
nullptr,
116 "Calling initialize more than once, previous id:{}, new id:{}",
121 node_cert.has_value() &&
122 make_verifier(node_cert.value())->is_self_signed())
125 "Refusing to initialize node-to-node channels with "
126 "self-signed node certificate.");
131 std::make_unique<ThisNode>(self_id, service_cert, node_kp, node_cert);
137 std::lock_guard<ccf::pal::Mutex> guard(lock);
138 this_node->endorsed_node_cert = endorsed_node_cert;
143 message_limit = message_limit_;
148 idle_timeout = idle_timeout_;
151 void tick(std::chrono::milliseconds elapsed)
override
153 std::lock_guard<ccf::pal::Mutex> guard(lock);
155 if (idle_timeout.has_value())
158 auto it = channels.begin();
159 while (it != channels.end())
161 const auto idle_time = it->second.idle_time += elapsed;
162 if (idle_time < idle_timeout.value())
169 "Closing idle channel to node {}. Was idle for {}, threshold for "
173 idle_timeout.value());
174 it->second.channel->close_channel();
175 it = channels.erase(it);
183 const std::string& peer_hostname,
184 const std::string& peer_service)
override
187 ccf::associate_node_address,
196 std::lock_guard<ccf::pal::Mutex> guard(lock);
197 return channels.find(nid) != channels.end();
204 size_t size)
override
207 this_node !=
nullptr,
208 "Calling send_authenticated before channel manager is initialized");
210 return get_channel(to)->send(type, std::span<const uint8_t>(data, size));
216 std::span<const uint8_t> header,
217 const std::vector<uint8_t>& data)
override
220 this_node !=
nullptr,
221 "Calling send_encrypted (to {}) before channel manager is initialized",
224 return get_channel(to)->send(type, header, data);
229 std::span<const uint8_t> header,
230 const uint8_t*& data,
231 size_t& size)
override
234 this_node !=
nullptr,
235 "Calling recv_authenticated (from {}) before channel manager is "
239 return get_channel(from)->recv_authenticated(header, data, size);
243 const NodeId& from,
const uint8_t*& data,
size_t& size)
override
246 this_node !=
nullptr,
247 "Calling recv_authenticated_with_load (from {}) before channel manager "
251 return get_channel(from)->recv_authenticated_with_load(data, size);
256 std::span<const uint8_t> header,
258 size_t size)
override
261 this_node !=
nullptr,
262 "Calling recv_encrypted (from {}) before channel manager is "
266 auto plain = get_channel(from)->recv_encrypted(header, data, size);
267 if (!plain.has_value())
272 return plain.value();
276 const NodeId& from,
const uint8_t* data,
size_t size)
override
279 this_node !=
nullptr,
280 "Calling recv_message (from {}) before channel manager is "
284 return get_channel(from)->recv_key_exchange_message(data, size);
295 std::lock_guard<ccf::pal::Mutex> guard(lock);
297 auto search = channels.find(peer_id);
298 if (search != channels.end())
300 search->second.channel->close_channel();
301 channels.erase(search);
307 return get_channel(peer_id)->channel_open();
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition node_to_node_channel_manager.h:13
std::vector< uint8_t > recv_encrypted(const NodeId &from, std::span< const uint8_t > header, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:254
void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
Definition node_to_node_channel_manager.h:146
bool recv_channel_message(const NodeId &from, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:275
bool have_channel(const ccf::NodeId &nid) override
Definition node_to_node_channel_manager.h:194
void initialize(const NodeId &self_id, const ccf::crypto::Pem &service_cert, ccf::crypto::ECKeyPairPtr node_kp, const std::optional< ccf::crypto::Pem > &node_cert) override
Definition node_to_node_channel_manager.h:108
void tick(std::chrono::milliseconds elapsed) override
Definition node_to_node_channel_manager.h:151
bool recv_channel_message(const NodeId &from, std::vector< uint8_t > &&body)
Definition node_to_node_channel_manager.h:288
void close_channel(const NodeId &peer_id) override
Definition node_to_node_channel_manager.h:293
bool send_encrypted(const NodeId &to, NodeMsgType type, std::span< const uint8_t > header, const std::vector< uint8_t > &data) override
Definition node_to_node_channel_manager.h:213
bool recv_authenticated(const NodeId &from, std::span< const uint8_t > header, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:227
NodeToNodeChannelManager(ringbuffer::AbstractWriterFactory &writer_factory_)
Definition node_to_node_channel_manager.h:102
void set_message_limit(size_t message_limit_) override
Definition node_to_node_channel_manager.h:141
bool channel_open(const NodeId &peer_id)
Definition node_to_node_channel_manager.h:305
void associate_node_address(const NodeId &peer_id, const std::string &peer_hostname, const std::string &peer_service) override
Definition node_to_node_channel_manager.h:181
bool recv_authenticated_with_load(const NodeId &from, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:242
bool send_authenticated(const NodeId &to, NodeMsgType type, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:200
void set_endorsed_node_cert(const ccf::crypto::Pem &endorsed_node_cert) override
Definition node_to_node_channel_manager.h:134
Definition node_to_node.h:22
Definition node_to_node.h:17
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
std::shared_ptr< ECKeyPair > ECKeyPairPtr
Definition ec_key_pair.h:144
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
NodeMsgType
Definition node_types.h:21
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
char const * data() const
Definition entity_id.h:77
Value & value()
Definition entity_id.h:67