CCF
Loading...
Searching...
No Matches
node_to_node_channel_manager.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/locking.h"
6#include "channels.h"
7#include "ds/ccf_assert.h"
8#include "node/node_to_node.h"
9
10namespace ccf
11{
13 {
14 private:
17
18 struct ChannelInfo
19 {
20 std::shared_ptr<Channel> channel;
21 std::chrono::milliseconds idle_time;
22 };
23
24 std::unordered_map<NodeId, ChannelInfo> channels;
25 ccf::pal::Mutex lock; //< Protects access to channels map
26
27 struct ThisNode
28 {
29 NodeId node_id;
30 ccf::crypto::Pem service_cert;
32 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
33 };
34 std::unique_ptr<ThisNode> this_node; //< Not available at construction, only
35 // after calling initialize()
36
37 // This is set during node startup, using a value from the run-time
38 // configuration (unless a unit test has set a compile-time default)
39 std::optional<size_t> message_limit =
40#ifdef OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT
41 OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT;
42#else
43 std::nullopt;
44#endif
45
46 // This is set during node startup, using a value derived from the run-time
47 // configuration. Before that, no timeout applies
48 std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;
49
50 std::shared_ptr<Channel> get_channel(const NodeId& peer_id)
51 {
53 this_node == nullptr || this_node->node_id != peer_id,
54 "Requested channel with self {}",
55 peer_id);
56
57 if (!message_limit.has_value())
58 {
59 throw std::runtime_error(
60 "Node-to-node message limit has not yet been set");
61 }
62
63 std::lock_guard<ccf::pal::Mutex> guard(lock);
64
65 auto search = channels.find(peer_id);
66 if (search != channels.end())
67 {
68 auto& channel_info = search->second;
69 channel_info.idle_time = std::chrono::milliseconds(0);
70 return channel_info.channel;
71 }
72
73 if (this_node == nullptr)
74 {
75 throw std::runtime_error(
76 "Endorsed node certificate has not yet been set");
77 }
78
79 auto& endorsed_node_cert = this_node->endorsed_node_cert;
80 if (!endorsed_node_cert.has_value())
81 {
82 throw std::runtime_error(
83 "Cannot create node-to-node channel without endorsed node "
84 "certificate");
85 }
86
87 // Create channel
88 auto channel = std::make_shared<Channel>(
89 writer_factory,
90 this_node->service_cert,
91 this_node->node_kp,
92 *endorsed_node_cert,
93 this_node->node_id,
94 peer_id,
95 message_limit.value());
96 auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
97 channels.try_emplace(peer_id, info);
98 return channel;
99 }
100
101 public:
103 ringbuffer::AbstractWriterFactory& writer_factory_) :
104 writer_factory(writer_factory_),
105 to_host(writer_factory_.create_writer_to_outside())
106 {}
107
109 const NodeId& self_id,
110 const ccf::crypto::Pem& service_cert,
112 const std::optional<ccf::crypto::Pem>& node_cert) override
113 {
115 this_node == nullptr,
116 "Calling initialize more than once, previous id:{}, new id:{}",
117 this_node->node_id,
118 self_id);
119
120 if (
121 node_cert.has_value() &&
122 make_verifier(node_cert.value())->is_self_signed())
123 {
125 "Refusing to initialize node-to-node channels with "
126 "self-signed node certificate.");
127 return;
128 }
129
130 this_node =
131 std::make_unique<ThisNode>(self_id, service_cert, node_kp, node_cert);
132 }
133
135 const ccf::crypto::Pem& endorsed_node_cert) override
136 {
137 std::lock_guard<ccf::pal::Mutex> guard(lock);
138 this_node->endorsed_node_cert = endorsed_node_cert;
139 }
140
141 void set_message_limit(size_t message_limit_) override
142 {
143 message_limit = message_limit_;
144 }
145
146 void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
147 {
148 idle_timeout = idle_timeout_;
149 }
150
151 void tick(std::chrono::milliseconds elapsed) override
152 {
153 std::lock_guard<ccf::pal::Mutex> guard(lock);
154
155 if (idle_timeout.has_value())
156 {
157 // Close idle channels
158 auto it = channels.begin();
159 while (it != channels.end())
160 {
161 const auto idle_time = it->second.idle_time += elapsed;
162 if (idle_time < idle_timeout.value())
163 {
164 ++it;
165 }
166 else
167 {
169 "Closing idle channel to node {}. Was idle for {}, threshold for "
170 "closure is {}",
171 it->first,
172 idle_time,
173 idle_timeout.value());
174 it->second.channel->close_channel();
175 it = channels.erase(it);
176 }
177 }
178 }
179 }
180
182 const NodeId& peer_id,
183 const std::string& peer_hostname,
184 const std::string& peer_service) override
185 {
187 ccf::associate_node_address,
188 to_host,
189 peer_id.value(),
190 peer_hostname,
191 peer_service);
192 }
193
194 bool have_channel(const ccf::NodeId& nid) override
195 {
196 std::lock_guard<ccf::pal::Mutex> guard(lock);
197 return channels.find(nid) != channels.end();
198 }
199
201 const NodeId& to,
202 NodeMsgType type,
203 const uint8_t* data,
204 size_t size) override
205 {
207 this_node != nullptr,
208 "Calling send_authenticated before channel manager is initialized");
209
210 return get_channel(to)->send(type, std::span<const uint8_t>(data, size));
211 }
212
214 const NodeId& to,
215 NodeMsgType type,
216 std::span<const uint8_t> header,
217 const std::vector<uint8_t>& data) override
218 {
220 this_node != nullptr,
221 "Calling send_encrypted (to {}) before channel manager is initialized",
222 to);
223
224 return get_channel(to)->send(type, header, data);
225 }
226
228 const NodeId& from,
229 std::span<const uint8_t> header,
230 const uint8_t*& data,
231 size_t& size) override
232 {
234 this_node != nullptr,
235 "Calling recv_authenticated (from {}) before channel manager is "
236 "initialized",
237 from);
238
239 return get_channel(from)->recv_authenticated(header, data, size);
240 }
241
243 const NodeId& from, const uint8_t*& data, size_t& size) override
244 {
246 this_node != nullptr,
247 "Calling recv_authenticated_with_load (from {}) before channel manager "
248 "is initialized",
249 from);
250
251 return get_channel(from)->recv_authenticated_with_load(data, size);
252 }
253
254 std::vector<uint8_t> recv_encrypted(
255 const NodeId& from,
256 std::span<const uint8_t> header,
257 const uint8_t* data,
258 size_t size) override
259 {
261 this_node != nullptr,
262 "Calling recv_encrypted (from {}) before channel manager is "
263 "initialized",
264 from);
265
266 auto plain = get_channel(from)->recv_encrypted(header, data, size);
267 if (!plain.has_value())
268 {
269 throw DroppedMessageException(from);
270 }
271
272 return plain.value();
273 }
274
276 const NodeId& from, const uint8_t* data, size_t size) override
277 {
279 this_node != nullptr,
280 "Calling recv_message (from {}) before channel manager is "
281 "initialized",
282 from);
283
284 return get_channel(from)->recv_key_exchange_message(data, size);
285 }
286
287 // NB: Following methods are only used by tests!
288 bool recv_channel_message(const NodeId& from, std::vector<uint8_t>&& body)
289 {
290 return recv_channel_message(from, body.data(), body.size());
291 }
292
293 void close_channel(const NodeId& peer_id) override
294 {
295 std::lock_guard<ccf::pal::Mutex> guard(lock);
296
297 auto search = channels.find(peer_id);
298 if (search != channels.end())
299 {
300 search->second.channel->close_channel();
301 channels.erase(search);
302 }
303 }
304
305 bool channel_open(const NodeId& peer_id)
306 {
307 return get_channel(peer_id)->channel_open();
308 }
309 };
310}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition node_to_node_channel_manager.h:13
std::vector< uint8_t > recv_encrypted(const NodeId &from, std::span< const uint8_t > header, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:254
void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
Definition node_to_node_channel_manager.h:146
bool recv_channel_message(const NodeId &from, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:275
bool have_channel(const ccf::NodeId &nid) override
Definition node_to_node_channel_manager.h:194
void initialize(const NodeId &self_id, const ccf::crypto::Pem &service_cert, ccf::crypto::ECKeyPairPtr node_kp, const std::optional< ccf::crypto::Pem > &node_cert) override
Definition node_to_node_channel_manager.h:108
void tick(std::chrono::milliseconds elapsed) override
Definition node_to_node_channel_manager.h:151
bool recv_channel_message(const NodeId &from, std::vector< uint8_t > &&body)
Definition node_to_node_channel_manager.h:288
void close_channel(const NodeId &peer_id) override
Definition node_to_node_channel_manager.h:293
bool send_encrypted(const NodeId &to, NodeMsgType type, std::span< const uint8_t > header, const std::vector< uint8_t > &data) override
Definition node_to_node_channel_manager.h:213
bool recv_authenticated(const NodeId &from, std::span< const uint8_t > header, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:227
NodeToNodeChannelManager(ringbuffer::AbstractWriterFactory &writer_factory_)
Definition node_to_node_channel_manager.h:102
void set_message_limit(size_t message_limit_) override
Definition node_to_node_channel_manager.h:141
bool channel_open(const NodeId &peer_id)
Definition node_to_node_channel_manager.h:305
void associate_node_address(const NodeId &peer_id, const std::string &peer_hostname, const std::string &peer_service) override
Definition node_to_node_channel_manager.h:181
bool recv_authenticated_with_load(const NodeId &from, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:242
bool send_authenticated(const NodeId &to, NodeMsgType type, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:200
void set_endorsed_node_cert(const ccf::crypto::Pem &endorsed_node_cert) override
Definition node_to_node_channel_manager.h:134
Definition node_to_node.h:22
Definition node_to_node.h:17
Definition pem.h:18
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
std::shared_ptr< ECKeyPair > ECKeyPairPtr
Definition ec_key_pair.h:144
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
NodeMsgType
Definition node_types.h:21
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
char const * data() const
Definition entity_id.h:77
Value & value()
Definition entity_id.h:67