CCF
Loading...
Searching...
No Matches
node_to_node_channel_manager.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/locking.h"
6#include "channels.h"
7#include "node/node_to_node.h"
8
9namespace ccf
10{
12 {
13 private:
16
17 struct ChannelInfo
18 {
19 std::shared_ptr<Channel> channel;
20 std::chrono::milliseconds idle_time;
21 };
22
23 std::unordered_map<NodeId, ChannelInfo> channels;
24 ccf::pal::Mutex lock; //< Protects access to channels map
25
26 struct ThisNode
27 {
28 NodeId node_id;
29 ccf::crypto::Pem service_cert;
31 std::optional<ccf::crypto::Pem> endorsed_node_cert = std::nullopt;
32 };
33 std::unique_ptr<ThisNode> this_node; //< Not available at construction, only
34 // after calling initialize()
35
36 // This is set during node startup, using a value from the run-time
37 // configuration (unless a unit test has set a compile-time default)
38 std::optional<size_t> message_limit =
39#ifdef OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT
40 OVERRIDE_DEFAULT_N2N_MESSAGE_LIMIT;
41#else
42 std::nullopt;
43#endif
44
45 // This is set during node startup, using a value derived from the run-time
46 // configuration. Before that, no timeout applies
47 std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;
48
49 std::shared_ptr<Channel> get_channel(const NodeId& peer_id)
50 {
52 this_node == nullptr || this_node->node_id != peer_id,
53 "Requested channel with self {}",
54 peer_id);
55
57 message_limit.has_value(),
58 "Node-to-node message limit has not yet been set");
59
60 std::lock_guard<ccf::pal::Mutex> guard(lock);
62 this_node != nullptr && this_node->endorsed_node_cert.has_value(),
63 "Endorsed node certificate has not yet been set");
64
65 auto search = channels.find(peer_id);
66 if (search != channels.end())
67 {
68 auto& channel_info = search->second;
69 channel_info.idle_time = std::chrono::milliseconds(0);
70 return channel_info.channel;
71 }
72
73 // Create channel
74 auto channel = std::make_shared<Channel>(
75 writer_factory,
76 this_node->service_cert,
77 this_node->node_kp,
78 this_node->endorsed_node_cert.value(),
79 this_node->node_id,
80 peer_id,
81 message_limit.value());
82 auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
83 channels.try_emplace(peer_id, info);
84 return channel;
85 }
86
87 public:
89 ringbuffer::AbstractWriterFactory& writer_factory_) :
90 writer_factory(writer_factory_),
91 to_host(writer_factory_.create_writer_to_outside())
92 {}
93
95 const NodeId& self_id,
96 const ccf::crypto::Pem& service_cert,
98 const std::optional<ccf::crypto::Pem>& node_cert) override
99 {
101 this_node == nullptr,
102 "Calling initialize more than once, previous id:{}, new id:{}",
103 this_node->node_id,
104 self_id);
105
106 if (
107 node_cert.has_value() &&
108 make_verifier(node_cert.value())->is_self_signed())
109 {
111 "Refusing to initialize node-to-node channels with "
112 "self-signed node certificate.");
113 return;
114 }
115
116 this_node = std::unique_ptr<ThisNode>(
117 new ThisNode{self_id, service_cert, node_kp, node_cert});
118 }
119
121 const ccf::crypto::Pem& endorsed_node_cert) override
122 {
123 std::lock_guard<ccf::pal::Mutex> guard(lock);
124 this_node->endorsed_node_cert = endorsed_node_cert;
125 }
126
127 void set_message_limit(size_t message_limit_) override
128 {
129 message_limit = message_limit_;
130 }
131
132 void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
133 {
134 idle_timeout = idle_timeout_;
135 }
136
137 void tick(std::chrono::milliseconds elapsed) override
138 {
139 std::lock_guard<ccf::pal::Mutex> guard(lock);
140
141 if (idle_timeout.has_value())
142 {
143 // Close idle channels
144 auto it = channels.begin();
145 while (it != channels.end())
146 {
147 const auto idle_time = it->second.idle_time += elapsed;
148 if (idle_time < idle_timeout.value())
149 {
150 ++it;
151 }
152 else
153 {
155 "Closing idle channel to node {}. Was idle for {}, threshold for "
156 "closure is {}",
157 it->first,
158 idle_time,
159 idle_timeout.value());
160 it->second.channel->close_channel();
161 it = channels.erase(it);
162 }
163 }
164 }
165 }
166
168 const NodeId& peer_id,
169 const std::string& peer_hostname,
170 const std::string& peer_service) override
171 {
173 ccf::associate_node_address,
174 to_host,
175 peer_id.value(),
176 peer_hostname,
177 peer_service);
178 }
179
180 bool have_channel(const ccf::NodeId& nid) override
181 {
182 std::lock_guard<ccf::pal::Mutex> guard(lock);
183 return channels.find(nid) != channels.end();
184 }
185
187 const NodeId& to,
188 NodeMsgType type,
189 const uint8_t* data,
190 size_t size) override
191 {
193 this_node != nullptr,
194 "Calling send_authenticated before channel manager is initialized");
195
196 return get_channel(to)->send(type, std::span<const uint8_t>(data, size));
197 }
198
200 const NodeId& to,
201 NodeMsgType type,
202 std::span<const uint8_t> header,
203 const std::vector<uint8_t>& data) override
204 {
206 this_node != nullptr,
207 "Calling send_encrypted (to {}) before channel manager is initialized",
208 to);
209
210 return get_channel(to)->send(type, header, data);
211 }
212
214 const NodeId& from,
215 std::span<const uint8_t> header,
216 const uint8_t*& data,
217 size_t& size) override
218 {
220 this_node != nullptr,
221 "Calling recv_authenticated (from {}) before channel manager is "
222 "initialized",
223 from);
224
225 return get_channel(from)->recv_authenticated(header, data, size);
226 }
227
229 const NodeId& from, const uint8_t*& data, size_t& size) override
230 {
232 this_node != nullptr,
233 "Calling recv_authenticated_with_load (from {}) before channel manager "
234 "is initialized",
235 from);
236
237 return get_channel(from)->recv_authenticated_with_load(data, size);
238 }
239
240 std::vector<uint8_t> recv_encrypted(
241 const NodeId& from,
242 std::span<const uint8_t> header,
243 const uint8_t* data,
244 size_t size) override
245 {
247 this_node != nullptr,
248 "Calling recv_encrypted (from {}) before channel manager is "
249 "initialized",
250 from);
251
252 auto plain = get_channel(from)->recv_encrypted(header, data, size);
253 if (!plain.has_value())
254 {
255 throw DroppedMessageException(from);
256 }
257
258 return plain.value();
259 }
260
262 const NodeId& from, const uint8_t* data, size_t size) override
263 {
265 this_node != nullptr,
266 "Calling recv_message (from {}) before channel manager is "
267 "initialized",
268 from);
269
270 return get_channel(from)->recv_key_exchange_message(data, size);
271 }
272
273 // NB: Following methods are only used by tests!
274 bool recv_channel_message(const NodeId& from, std::vector<uint8_t>&& body)
275 {
276 return recv_channel_message(from, body.data(), body.size());
277 }
278
279 void close_channel(const NodeId& peer_id) override
280 {
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
282
283 auto search = channels.find(peer_id);
284 if (search != channels.end())
285 {
286 search->second.channel->close_channel();
287 channels.erase(search);
288 }
289 }
290
291 bool channel_open(const NodeId& peer_id)
292 {
293 return get_channel(peer_id)->channel_open();
294 }
295 };
296}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition node_to_node_channel_manager.h:12
std::vector< uint8_t > recv_encrypted(const NodeId &from, std::span< const uint8_t > header, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:240
void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
Definition node_to_node_channel_manager.h:132
bool recv_channel_message(const NodeId &from, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:261
bool have_channel(const ccf::NodeId &nid) override
Definition node_to_node_channel_manager.h:180
void tick(std::chrono::milliseconds elapsed) override
Definition node_to_node_channel_manager.h:137
virtual void associate_node_address(const NodeId &peer_id, const std::string &peer_hostname, const std::string &peer_service) override
Definition node_to_node_channel_manager.h:167
bool recv_channel_message(const NodeId &from, std::vector< uint8_t > &&body)
Definition node_to_node_channel_manager.h:274
void close_channel(const NodeId &peer_id) override
Definition node_to_node_channel_manager.h:279
bool send_encrypted(const NodeId &to, NodeMsgType type, std::span< const uint8_t > header, const std::vector< uint8_t > &data) override
Definition node_to_node_channel_manager.h:199
bool recv_authenticated(const NodeId &from, std::span< const uint8_t > header, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:213
NodeToNodeChannelManager(ringbuffer::AbstractWriterFactory &writer_factory_)
Definition node_to_node_channel_manager.h:88
void set_message_limit(size_t message_limit_) override
Definition node_to_node_channel_manager.h:127
void initialize(const NodeId &self_id, const ccf::crypto::Pem &service_cert, ccf::crypto::KeyPairPtr node_kp, const std::optional< ccf::crypto::Pem > &node_cert) override
Definition node_to_node_channel_manager.h:94
bool channel_open(const NodeId &peer_id)
Definition node_to_node_channel_manager.h:291
bool recv_authenticated_with_load(const NodeId &from, const uint8_t *&data, size_t &size) override
Definition node_to_node_channel_manager.h:228
bool send_authenticated(const NodeId &to, NodeMsgType type, const uint8_t *data, size_t size) override
Definition node_to_node_channel_manager.h:186
void set_endorsed_node_cert(const ccf::crypto::Pem &endorsed_node_cert) override
Definition node_to_node_channel_manager.h:120
Definition node_to_node.h:22
Definition node_to_node.h:17
Definition pem.h:18
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:362
#define LOG_DEBUG_FMT
Definition logger.h:357
std::shared_ptr< KeyPair > KeyPairPtr
Definition key_pair.h:145
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
NodeMsgType
Definition node_types.h:19
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
char const * data() const
Definition entity_id.h:70
Value & value()
Definition entity_id.h:60