19 std::shared_ptr<Channel> channel;
20 std::chrono::milliseconds idle_time;
23 std::unordered_map<NodeId, ChannelInfo> channels;
31 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
33 std::unique_ptr<ThisNode> this_node;
38 std::optional<size_t> message_limit =
39#ifdef OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT
40 OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT;
47 std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;
49 std::shared_ptr<Channel> get_channel(
const NodeId& peer_id)
52 this_node ==
nullptr || this_node->node_id != peer_id,
53 "Requested channel with self {}",
57 message_limit.has_value(),
58 "Node-to-node message limit has not yet been set");
60 std::lock_guard<ccf::pal::Mutex> guard(lock);
62 this_node !=
nullptr && this_node->endorsed_node_cert.has_value(),
63 "Endorsed node certificate has not yet been set");
65 auto search = channels.find(peer_id);
66 if (search != channels.end())
68 auto& channel_info = search->second;
69 channel_info.idle_time = std::chrono::milliseconds(0);
70 return channel_info.channel;
74 auto channel = std::make_shared<Channel>(
76 this_node->service_cert,
78 this_node->endorsed_node_cert.value(),
81 message_limit.
value());
82 auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
83 channels.try_emplace(peer_id, info);
90 writer_factory(writer_factory_),
91 to_host(writer_factory_.create_writer_to_outside())
98 const std::optional<ccf::crypto::Pem>& node_cert)
override
101 this_node ==
nullptr,
102 "Calling initialize more than once, previous id:{}, new id:{}",
107 node_cert.has_value() &&
108 make_verifier(node_cert.value())->is_self_signed())
111 "Refusing to initialize node-to-node channels with "
112 "self-signed node certificate.");
116 this_node = std::unique_ptr<ThisNode>(
117 new ThisNode{self_id, service_cert, node_kp, node_cert});
123 std::lock_guard<ccf::pal::Mutex> guard(lock);
124 this_node->endorsed_node_cert = endorsed_node_cert;
129 message_limit = message_limit_;
134 idle_timeout = idle_timeout_;
137 void tick(std::chrono::milliseconds elapsed)
override
139 std::lock_guard<ccf::pal::Mutex> guard(lock);
141 if (idle_timeout.has_value())
144 auto it = channels.begin();
145 while (it != channels.end())
147 const auto idle_time = it->second.idle_time += elapsed;
148 if (idle_time < idle_timeout.value())
155 "Closing idle channel to node {}. Was idle for {}, threshold for "
159 idle_timeout.value());
160 it->second.channel->close_channel();
161 it = channels.erase(it);
169 const std::string& peer_hostname,
170 const std::string& peer_service)
override
173 ccf::associate_node_address,
182 std::lock_guard<ccf::pal::Mutex> guard(lock);
183 return channels.find(nid) != channels.end();
190 size_t size)
override
193 this_node !=
nullptr,
194 "Calling send_authenticated before channel manager is initialized");
196 return get_channel(to)->send(type, std::span<const uint8_t>(data, size));
202 std::span<const uint8_t> header,
203 const std::vector<uint8_t>& data)
override
206 this_node !=
nullptr,
207 "Calling send_encrypted (to {}) before channel manager is initialized",
210 return get_channel(to)->send(type, header, data);
215 std::span<const uint8_t> header,
216 const uint8_t*& data,
217 size_t& size)
override
220 this_node !=
nullptr,
221 "Calling recv_authenticated (from {}) before channel manager is "
225 return get_channel(from)->recv_authenticated(header, data, size);
229 const NodeId& from,
const uint8_t*& data,
size_t& size)
override
232 this_node !=
nullptr,
233 "Calling recv_authenticated_with_load (from {}) before channel manager "
237 return get_channel(from)->recv_authenticated_with_load(data, size);
242 std::span<const uint8_t> header,
244 size_t size)
override
247 this_node !=
nullptr,
248 "Calling recv_encrypted (from {}) before channel manager is "
252 auto plain = get_channel(from)->recv_encrypted(header, data, size);
253 if (!plain.has_value())
258 return plain.value();
262 const NodeId& from,
const uint8_t* data,
size_t size)
override
265 this_node !=
nullptr,
266 "Calling recv_message (from {}) before channel manager is "
270 return get_channel(from)->recv_key_exchange_message(data, size);
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
283 auto search = channels.find(peer_id);
284 if (search != channels.end())
286 search->second.channel->close_channel();
287 channels.erase(search);
293 return get_channel(peer_id)->channel_open();
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition node_to_node_channel_manager.h:12
std::vector< uint8_t > recv_encrypted(const NodeId &from, std::span< const uint8_t > header, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:240
void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
Definition node_to_node_channel_manager.h:132
bool recv_channel_message(const NodeId &from, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:261
bool have_channel(const ccf::NodeId &nid) override
Definition node_to_node_channel_manager.h:180
void tick(std::chrono::milliseconds elapsed) override
Definition node_to_node_channel_manager.h:137
virtual void associate_node_address(const NodeId &peer_id, const std::string &peer_hostname, const std::string &peer_service) override
Definition node_to_node_channel_manager.h:167
bool recv_channel_message(const NodeId &from, std::vector< uint8_t > &&body)
Definition node_to_node_channel_manager.h:274
void close_channel(const NodeId &peer_id) override
Definition node_to_node_channel_manager.h:279
bool send_encrypted(const NodeId &to, NodeMsgType type, std::span< const uint8_t > header, const std::vector< uint8_t > &data) override
Definition node_to_node_channel_manager.h:199
bool recv_authenticated(const NodeId &from, std::span< const uint8_t > header, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:213
NodeToNodeChannelManager(ringbuffer::AbstractWriterFactory &writer_factory_)
Definition node_to_node_channel_manager.h:88
void set_message_limit(size_t message_limit_) override
Definition node_to_node_channel_manager.h:127
void initialize(const NodeId &self_id, const ccf::crypto::Pem &service_cert, ccf::crypto::KeyPairPtr node_kp, const std::optional< ccf::crypto::Pem > &node_cert) override
Definition node_to_node_channel_manager.h:94
bool channel_open(const NodeId &peer_id)
Definition node_to_node_channel_manager.h:291
bool recv_authenticated_with_load(const NodeId &from, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:228
bool send_authenticated(const NodeId &to, NodeMsgType type, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:186
void set_endorsed_node_cert(const ccf::crypto::Pem &endorsed_node_cert) override
Definition node_to_node_channel_manager.h:120
Definition node_to_node.h:22
Definition node_to_node.h:17
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
std::shared_ptr< KeyPair > KeyPairPtr
Definition key_pair.h:145
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
NodeMsgType
Definition node_types.h:19
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
char const * data() const
Definition entity_id.h:70
Value & value()
Definition entity_id.h:60