CCF
Loading...
Searching...
No Matches
channels.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
7#include "ccf/crypto/hkdf.h"
11#include "ccf/ds/hex.h"
12#include "ccf/entity_id.h"
13#include "ccf/pal/locking.h"
14#include "crypto/key_exchange.h"
15#include "ds/internal_logger.h"
16#include "ds/serialized.h"
17#include "ds/state_machine.h"
18#include "node/node_types.h"
19
20#include <iostream>
21#include <map>
22#include <openssl/crypto.h>
23
24// -Wpedantic flags token pasting of __VA_ARGS__
25#pragma clang diagnostic push
26#pragma clang diagnostic ignored "-Wgnu-zero-variadic-macro-arguments"
27
28#define CHANNEL_RECV_TRACE(s, ...) \
29 LOG_TRACE_FMT("<- {} ({}): " s, peer_id, status.value(), ##__VA_ARGS__)
30#define CHANNEL_SEND_TRACE(s, ...) \
31 LOG_TRACE_FMT("-> {} ({}): " s, peer_id, status.value(), ##__VA_ARGS__)
32
33#define CHANNEL_RECV_FAIL(s, ...) \
34 LOG_FAIL_FMT("<- {} ({}): " s, peer_id, status.value(), ##__VA_ARGS__)
35#define CHANNEL_SEND_FAIL(s, ...) \
36 LOG_FAIL_FMT("-> {} ({}): " s, peer_id, status.value(), ##__VA_ARGS__)
37
38namespace ccf
39{
47}
48
49FMT_BEGIN_NAMESPACE
50template <>
51struct formatter<ccf::ChannelStatus>
52{
53 template <typename ParseContext>
54 constexpr auto parse(ParseContext& ctx)
55 {
56 return ctx.begin();
57 }
58
59 template <typename FormatContext>
60 auto format(const ccf::ChannelStatus& cs, FormatContext& ctx) const
61 {
62 char const* s = "Unknown";
63 switch (cs)
64 {
65 case (ccf::INACTIVE):
66 {
67 s = "INACTIVE";
68 break;
69 }
70 case (ccf::INITIATED):
71 {
72 s = "INITIATED";
73 break;
74 }
76 {
77 s = "WAITING_FOR_FINAL";
78 break;
79 }
80 case (ccf::ESTABLISHED):
81 {
82 s = "ESTABLISHED";
83 break;
84 }
85 }
86
87 return format_to(ctx.out(), "{}", s);
88 }
89};
90FMT_END_NAMESPACE
91
92namespace ccf
93{
94 using MsgNonce = uint64_t;
96
97 // Receive nonces were previously stored per-thread. For backwards
98 // compatibility (live communication with nodes still using this format), we
99 // maintain this serialization struct, but with thread ID (tid) always set to
100 // 0
102 {
103 const uint8_t tid = 0;
104 uint64_t nonce : (sizeof(MsgNonce) - sizeof(tid)) * CHAR_BIT;
105
106 WireNonce(uint64_t nonce_) : nonce(nonce_) {}
107
108 [[nodiscard]] uint64_t get_val() const
109 {
110 return *reinterpret_cast<const uint64_t*>(this);
111 }
112 };
113 static_assert(
114 sizeof(WireNonce) == sizeof(MsgNonce), "WireNonce is the wrong size");
115
116 // Static helper functions for serialization/deserialization
117 inline WireNonce get_wire_nonce(const GcmHdr& header)
118 {
119 return *reinterpret_cast<const WireNonce*>(header.iv.data());
120 }
121
122 template <typename T>
123 inline void append_value(std::vector<uint8_t>& target, const T& t)
124 {
125 const auto size_before = target.size();
126 auto size = sizeof(t);
127 target.resize(size_before + size);
128 auto* data = target.data() + size_before;
129 serialized::write(data, size, t);
130 }
131
132 inline void append_buffer(
133 std::vector<uint8_t>& target, std::span<const uint8_t> src)
134 {
135 const auto size_before = target.size();
136 auto size = src.size() + sizeof(src.size());
137 target.resize(size_before + size);
138 auto* data = target.data() + size_before;
139 serialized::write(data, size, src.size());
140 serialized::write(data, size, src.data(), src.size());
141 }
142
143 // Key exchange states are:
144 // - Have nothing
145 // - Initiated (have my own share)
146 // - Have their share and my share (received init, or received response)
147 // - => Have shared secret
148 // - Know that THEY have shared secret (received response or final)
149 // As soon as we have both shares, we update our send key
150 // As soon as we know that they have shared secret, we update our recv key
151 // Note this assumes that the key exchange messages are reliably delivered,
152 // else we switch keys without telling the peer that we did.
153
155 {
156 public:
157 static std::chrono::system_clock::duration&
159 {
160 static std::chrono::system_clock::duration value =
161 std::chrono::seconds(2);
162 return value;
163 }
164
165 private:
166 struct OutgoingMsg
167 {
168 NodeMsgType type;
169 std::vector<uint8_t> raw_aad; // To be integrity-protected
170 std::vector<uint8_t> raw_plain; // To be encrypted
171
172 OutgoingMsg(
173 NodeMsgType msg_type,
174 std::span<const uint8_t> raw_aad_,
175 std::span<const uint8_t> raw_plain_) :
176 type(msg_type),
177 raw_aad(raw_aad_.begin(), raw_aad_.end()),
178 raw_plain(raw_plain_.begin(), raw_plain_.end())
179 {}
180 };
181
182 ccf::pal::Mutex lock;
183
184 NodeId self;
185 const ccf::crypto::Pem& service_cert;
187 const ccf::crypto::Pem& node_cert;
189 ccf::crypto::Pem peer_cert;
190
191 ringbuffer::WriterPtr to_host;
192 NodeId peer_id;
193
194 // Used for key exchange
197 std::chrono::system_clock::time_point last_initiation_time;
198 static constexpr size_t salt_len = 32;
199 static constexpr size_t shared_key_size = 32;
200 std::vector<uint8_t> hkdf_salt;
201 size_t message_limit;
202
203 // Used for AES GCM authentication/encryption
204 std::unique_ptr<ccf::crypto::KeyAesGcm> recv_key = nullptr;
205 std::unique_ptr<ccf::crypto::KeyAesGcm> send_key = nullptr;
206
207 // Incremented for each tagged/encrypted message
208 std::atomic<MsgNonce> send_nonce{1};
209
210 // Used to buffer at most one message sent on the channel before it is
211 // established
212 std::optional<OutgoingMsg> outgoing_consensus_msg;
213
214 // Used to buffer a small number of messages sent on the channel before it
215 // is established. If this queue fills, then additional send attempts while
216 // the channel is still being established will not be buffered, and the
217 // caller should react appropriately.
218 static constexpr size_t outgoing_forwarding_queue_size = 10;
219 std::vector<OutgoingMsg> outgoing_forwarding_msgs;
220
221 // Used to prevent replayed messages.
222 // Set to the latest successfully received nonce.
223 MsgNonce local_recv_nonce = {0};
224
225 void check_message_limit()
226 {
227 // At half message limit, trigger a new key exchange.
228 // At hard message limit, drop existing keys, ensuring no further
229 // communication until fresh keys have been exchanged
230 const auto lower_limit = message_limit / 2;
231 size_t num_messages = send_nonce + local_recv_nonce;
232 if (num_messages >= lower_limit && status.check(ESTABLISHED))
233 {
235 "Reached message limit ({}+{} >= {}), triggering new key exchange",
236 send_nonce,
237 local_recv_nonce,
238 lower_limit);
239 reset_key_exchange();
240 initiate();
241 }
242 else if (num_messages >= message_limit)
243 {
245 "Reached hard message limit ({}+{} >= {}), dropping previous keys",
246 send_nonce,
247 local_recv_nonce,
248 message_limit);
249
250 send_key = nullptr;
251 send_nonce = 0;
252 recv_key = nullptr;
253 local_recv_nonce = 0;
254 reset_key_exchange();
255 initiate();
256 }
257 }
258
259 bool decrypt(
260 const GcmHdr& header,
261 std::span<const uint8_t> aad,
262 std::span<const uint8_t> cipher,
263 std::vector<uint8_t>& plain)
264 {
265 if (recv_key == nullptr)
266 {
267 throw std::logic_error("Tried to decrypt, but have no receive key");
268 }
269
270 auto wire_nonce = get_wire_nonce(header);
271 auto recv_nonce = wire_nonce.nonce;
272
274 "decrypt({} bytes, {} bytes) (nonce={})",
275 aad.size(),
276 cipher.size(),
277 recv_nonce);
278
279 // Note: We must assume that some messages are dropped, i.e. we may not
280 // see every nonce/sequence number, but they must be increasing.
281
282 if (recv_nonce <= local_recv_nonce)
283 {
284 // If the nonce received has already been processed, return
285 // See https://github.com/microsoft/CCF/issues/2492 for more details on
286 // how this can happen around election time
288 "Received past nonce, received:{}, "
289 "last_seen:{}",
290 recv_nonce,
291 local_recv_nonce);
292 return false;
293 }
294
295 auto ret =
296 recv_key->decrypt(header.get_iv(), header.tag, cipher, aad, plain);
297 if (ret)
298 {
299 // Set local recv nonce to received nonce only if verification is
300 // successful
301 local_recv_nonce = recv_nonce;
302 }
303
304 check_message_limit();
305
306 return ret;
307 }
308
309 bool verify(const GcmHdr& header, std::span<const uint8_t> aad)
310 {
311 std::vector<uint8_t> empty_plaintext;
312 return decrypt(header, aad, {}, empty_plaintext);
313 }
314
315 void send_key_exchange_init()
316 {
317 std::vector<uint8_t> payload;
318 {
321 append_buffer(payload, kex_ctx.get_own_key_share());
322 auto signature = node_kp->sign(kex_ctx.get_own_key_share());
323 append_buffer(payload, signature);
325 payload,
326 std::span<const uint8_t>(node_cert.data(), node_cert.size()));
327 append_buffer(payload, hkdf_salt);
328 }
329
331 "send_key_exchange_init: node serial: {}",
332 make_verifier(node_cert)->serial_number());
333
335 node_outbound,
336 to_host,
337 peer_id.value(),
339 self.value(),
340 payload);
341 }
342
343 void send_key_exchange_response()
344 {
345 std::vector<uint8_t> signature;
346 {
347 auto to_sign = kex_ctx.get_own_key_share();
348 const auto& peer_ks = kex_ctx.get_peer_key_share();
349 to_sign.insert(to_sign.end(), peer_ks.begin(), peer_ks.end());
350 signature = node_kp->sign(to_sign);
351 }
352
353 std::vector<uint8_t> payload;
354 {
357 append_buffer(payload, kex_ctx.get_own_key_share());
358 append_buffer(payload, signature);
360 payload,
361 std::span<const uint8_t>(node_cert.data(), node_cert.size()));
362 }
363
365 "send_key_exchange_response: oks={}, serialised_signed_share={}",
366 ds::to_hex(kex_ctx.get_own_key_share()),
367 ds::to_hex(payload));
368
370 node_outbound,
371 to_host,
372 peer_id.value(),
374 self.value(),
375 payload);
376 }
377
378 void send_key_exchange_final()
379 {
380 std::vector<uint8_t> payload;
381 {
383 // append_value(payload, protocol_version); // Not sent by
384 // current protocol!
385 auto signature = node_kp->sign(kex_ctx.get_peer_key_share());
386 append_buffer(payload, signature);
387 }
388
390 "key_exchange_final: ks={}, serialised_signed_key_share={}",
391 ds::to_hex(kex_ctx.get_peer_key_share()),
392 ds::to_hex(payload));
393
395 node_outbound,
396 to_host,
397 peer_id.value(),
399 self.value(),
400 payload);
401 }
402
403 void advance_connection_attempt()
404 {
405 if (status.check(INACTIVE))
406 {
407 // We have no key and believe no key exchange is in process - start a
408 // new iteration of the key exchange protocol
409 initiate();
410 }
411 else if (status.check(INITIATED))
412 {
413 const auto time_since_initiated =
414 decltype(last_initiation_time)::clock::now() - last_initiation_time;
415 if (time_since_initiated >= min_gap_between_initiation_attempts())
416 {
417 // If this node attempts to initiate too early when the peer node
418 // starts up, they will never receive the init message (they drop it
419 // if it arrives too early in their state machine). The same state
420 // could also occur later, if the initiate message is lost in transit.
421 // So sometimes this node needs to re-initiate. However, if this node
422 // sends too fast before the channel is established, and each send
423 // generates a new handshake, it may constantly generate new handshake
424 // attempts and never succeed. Additionally, when talking to peers
425 // using the old channel behaviour, this node should try to avoid
426 // confusing them by sending multiple adjacent initiate requests -
427 // they will only process the first one they receive. To avoid these
428 // problems with initiation spam, we have a minimum delay between
429 // initiation attempts. This should be low enough to get reasonable
430 // liveness (re-attempt connections in the presence of dropped
431 // messages), but high enough to give successful roundtrips a chance
432 // to complete.
433 initiate();
434 }
435 }
436 }
437
438 bool recv_key_exchange_init(
439 const uint8_t* data, size_t size, bool they_have_priority = false)
440 {
442 "recv_key_exchange_init({} bytes, {})", size, they_have_priority);
443
444 // Parse fields from incoming message
445 auto peer_version = serialized::read<size_t>(data, size);
446 if (peer_version != protocol_version)
447 {
449 "Protocol version mismatch (node={}, peer={})",
451 peer_version);
452 return false;
453 }
454
455 auto ks = extract_span(data, size);
456 if (ks.empty())
457 {
458 CHANNEL_RECV_FAIL("Empty keyshare");
459 return false;
460 }
461
462 auto sig = extract_span(data, size);
463 if (sig.empty())
464 {
465 CHANNEL_RECV_FAIL("Empty signature");
466 return false;
467 }
468
469 auto pc = extract_span(data, size);
470 if (pc.empty())
471 {
472 CHANNEL_RECV_FAIL("Empty cert");
473 return false;
474 }
475
476 auto salt = extract_span(data, size);
477 if (salt.empty())
478 {
479 CHANNEL_RECV_FAIL("Empty salt");
480 return false;
481 }
482
483 if (size != 0)
484 {
485 CHANNEL_RECV_FAIL("{} exccess bytes remaining", size);
486 return false;
487 }
488
489 // Validate cert and signature in message
490 ccf::crypto::Pem cert;
492 if (!verify_peer_certificate(pc, cert, verifier))
493 {
495 "Peer certificate verification failed - recv_key_exchange_init "
496 "failed to verify peer cert:\n{}\nUsing trusted service "
497 "certificate:\n{}",
498 cert.str(),
499 service_cert.str());
500 return false;
501 }
502
503 if (!verify_peer_signature(ks, sig, verifier))
504 {
505 return false;
506 }
507
508 // Both nodes tried to initiate the channel, the one with priority
509 // wins.
510 if (status.check(INITIATED) && !they_have_priority)
511 {
512 CHANNEL_RECV_TRACE("Ignoring lower priority key init");
513 return true;
514 }
515
516 // Whatever else we _were_ doing, we've received a valid init from them
517 // - reset to use it
518 if (status.check(ESTABLISHED))
519 {
520 kex_ctx.reset();
521 }
522 peer_cert = cert;
523 peer_cv = verifier;
524
526 "recv_key_exchange_init: version={} ks={} sig={} pc={} salt={}",
527 peer_version,
528 ds::to_hex(ks),
529 ds::to_hex(sig),
530 ds::to_hex(pc),
531 ds::to_hex(salt));
532
533 hkdf_salt = {salt.data(), salt.data() + salt.size()};
534
535 kex_ctx.load_peer_key_share(ks);
536
537 update_send_key();
538
540
541 // We are the responder and we return a signature over both public key
542 // shares back to the initiator
543 send_key_exchange_response();
544
545 flush_pending_outgoing();
546
547 return true;
548 }
549
550 bool recv_key_exchange_response(const uint8_t* data, size_t size)
551 {
552 CHANNEL_RECV_TRACE("recv_key_exchange_response({} bytes)", size);
553
554 if (status.value() != INITIATED)
555 {
556 CHANNEL_RECV_FAIL("Ignoring key exchange response - not expecting it");
557 return false;
558 }
559
560 // Parse fields from incoming message
561 auto peer_version = serialized::read<size_t>(data, size);
562 if (peer_version != protocol_version)
563 {
565 "Protocol version mismatch (node={}, peer={})",
567 peer_version);
568 return false;
569 }
570
571 auto ks = extract_span(data, size);
572 if (ks.empty())
573 {
574 CHANNEL_RECV_FAIL("Empty keyshare");
575 return false;
576 }
577
578 auto sig = extract_span(data, size);
579 if (sig.empty())
580 {
581 CHANNEL_RECV_FAIL("Empty signature");
582 return false;
583 }
584
585 auto pc = extract_span(data, size);
586 if (pc.empty())
587 {
588 CHANNEL_RECV_FAIL("Empty cert");
589 return false;
590 }
591
592 if (size != 0)
593 {
594 CHANNEL_RECV_FAIL("{} exccess bytes remaining", size);
595 return false;
596 }
597
598 // Validate cert and signature in message
599 ccf::crypto::Pem cert;
601 if (!verify_peer_certificate(pc, cert, verifier))
602 {
604 "Peer certificate verification failed - recv_key_exchange_response "
605 "failed to verify peer cert:\n{}\nUsing trusted service "
606 "certificate:\n{}",
607 cert.str(),
608 service_cert.str());
609 return false;
610 }
611
612 {
613 // We are the initiator and expect a signature over both key shares
614 std::vector<uint8_t> signed_msg(ks.begin(), ks.end());
615 const auto& oks = kex_ctx.get_own_key_share();
616 signed_msg.insert(signed_msg.end(), oks.begin(), oks.end());
617
618 if (!verify_peer_signature(signed_msg, sig, verifier))
619 {
620 // This isn't a valid signature for this key exchange attempt.
622 "Peer certificate verification failed - recv_key_exchange_response "
623 "failed to verify signature from cert:\n{}",
624 cert.str());
625 return false;
626 }
627 }
628
629 peer_cert = cert;
630 peer_cv = verifier;
631
632 kex_ctx.load_peer_key_share(ks);
633
634 update_send_key();
635
636 send_key_exchange_final();
637
638 flush_pending_outgoing();
639
640 update_recv_key();
641
642 establish();
643
644 return true;
645 }
646
647 bool recv_key_exchange_final(const uint8_t* data, size_t size)
648 {
649 CHANNEL_RECV_TRACE("recv_key_exchange_final({} bytes)", size);
650
651 if (status.value() != WAITING_FOR_FINAL)
652 {
653 CHANNEL_RECV_FAIL("Ignoring key exchange final - not expecting it");
654 return false;
655 }
656
657 // Parse fields from incoming message
658 // size_t peer_version = serialized::read<size_t>(data, size);
659 // if (peer_version != protocol_version)
660 // {
661 // CHANNEL_RECV_FAIL(
662 // "Protocol version mismatch (node={}, peer={})",
663 // protocol_version,
664 // peer_version);
665 // return false;
666 // }
667
668 auto sig = extract_span(data, size);
669 if (sig.empty())
670 {
671 CHANNEL_RECV_FAIL("Empty signature");
672 return false;
673 }
674
675 if (!verify_peer_signature(kex_ctx.get_own_key_share(), sig, peer_cv))
676 {
678 "Peer certificate verification failed - recv_key_exchange_final "
679 "failed to verify signature from peer with serial number {}",
680 peer_cv->serial_number());
681 return false;
682 }
683
684 update_recv_key();
685
686 establish();
687
688 return true;
689 }
690
691 std::span<const uint8_t> extract_span(
692 const uint8_t*& data, size_t& size) const
693 {
694 if (size == 0)
695 {
696 return {};
697 }
698
699 auto sz = serialized::read<size_t>(data, size);
700 const uint8_t* data_start = data;
701
702 if (sz > size)
703 {
705 "Buffer header wants {} bytes, but only {} remain", sz, size);
706 return {};
707 }
708
709 data += sz;
710 size -= sz;
711
712 return {data_start, sz};
713 }
714
715 bool verify_peer_certificate(
716 std::span<const uint8_t> pc,
717 ccf::crypto::Pem& cert,
718 ccf::crypto::VerifierPtr& verifier)
719 {
720 if (!pc.empty())
721 {
722 cert = ccf::crypto::Pem(pc);
723 verifier = ccf::crypto::make_verifier(cert);
724
725 // 'true' is `ignore_time` => These node-to-node channels do not care
726 // about certificate times, and should still pass even when given
727 // expired certs
728 if (!verifier->verify_certificate(
729 {&service_cert}, {}, true /* no validity expiration check */))
730 {
731 return false;
732 }
733
735 "New peer certificate: {}\n{}",
736 verifier->serial_number(),
737 cert.str());
738
739 return true;
740 }
741
742 return false;
743 }
744
745 bool verify_peer_signature(
746 std::span<const uint8_t> msg,
747 std::span<const uint8_t> sig,
749 {
751 "Verifying peer signature with peer certificate serial {}",
752 verifier ? verifier->serial_number() : "no peer_cv!");
753
754 return verifier && verifier->verify(msg, sig);
755 }
756
757 void update_send_key()
758 {
759 const std::string label_to = self.value() + peer_id.value();
760 const std::span<const uint8_t> label(
761 reinterpret_cast<const uint8_t*>(label_to.c_str()), label_to.size());
762 const auto key_bytes = ccf::crypto::hkdf(
764 shared_key_size,
765 kex_ctx.get_shared_secret(),
766 hkdf_salt,
767 label);
768 send_key = ccf::crypto::make_key_aes_gcm(key_bytes);
769
770 send_nonce = 1;
771 }
772
773 void update_recv_key()
774 {
775 const std::string label_from = peer_id.value() + self.value();
776 const std::span<const uint8_t> label(
777 reinterpret_cast<const uint8_t*>(label_from.c_str()),
778 label_from.size());
779 const auto key_bytes = ccf::crypto::hkdf(
781 shared_key_size,
782 kex_ctx.get_shared_secret(),
783 hkdf_salt,
784 label);
785 recv_key = ccf::crypto::make_key_aes_gcm(key_bytes);
786
787 local_recv_nonce = 0;
788 }
789
790 void establish()
791 {
792 status.advance(ESTABLISHED);
793 LOG_INFO_FMT("Node channel with {} is now established.", peer_id);
794
795 kex_ctx.reset();
796
797 auto node_cv = make_verifier(node_cert);
799 "Node certificate serial numbers: node={} peer={}",
800 node_cv->serial_number(),
801 peer_cv->serial_number());
802 }
803
804 void flush_pending_outgoing()
805 {
806 if (outgoing_consensus_msg.has_value())
807 {
808 send_unsafe(
809 outgoing_consensus_msg->type,
810 outgoing_consensus_msg->raw_aad,
811 outgoing_consensus_msg->raw_plain);
812 outgoing_consensus_msg.reset();
813 }
814
815 for (auto& outgoing_msg : outgoing_forwarding_msgs)
816 {
817 send_unsafe(
818 outgoing_msg.type, outgoing_msg.raw_aad, outgoing_msg.raw_plain);
819 CHANNEL_SEND_TRACE("Flushing previously queued forwarding message");
820 }
821 outgoing_forwarding_msgs.clear();
822 }
823
824 void initiate()
825 {
826 LOG_INFO_FMT("Initiating node channel with {}.", peer_id);
827
828 // Begin with new key exchange
829 kex_ctx.reset();
830 peer_cert = {};
831 peer_cv.reset();
832
833 auto e = ccf::crypto::get_entropy();
834 hkdf_salt = e->random(salt_len);
835
836 // As a future simplification, we would like this to always be true
837 // (initiations must travel through reset/inactive), but it is not
838 // currently true
839 // status.expect(INACTIVE);
840 status.advance(INITIATED);
841
842 last_initiation_time = decltype(last_initiation_time)::clock::now();
843
844 send_key_exchange_init();
845 }
846
847 void reset_key_exchange()
848 {
849 LOG_INFO_FMT("Resetting channel with {}", peer_id);
850
851 status.advance(INACTIVE);
852 kex_ctx.reset();
853 peer_cert = {};
854 peer_cv.reset();
855
856 auto e = ccf::crypto::get_entropy();
857 hkdf_salt = e->random(salt_len);
858 }
859
860 bool send_unsafe(
861 NodeMsgType type,
862 std::span<const uint8_t> aad,
863 std::span<const uint8_t> plain)
864 {
865 if (send_key == nullptr)
866 {
867 advance_connection_attempt();
868 switch (type)
869 {
871 {
872 if (outgoing_consensus_msg.has_value())
873 {
875 "Dropping outgoing consensus message - replaced by new "
876 "consensus message");
877 }
878 outgoing_consensus_msg = OutgoingMsg(type, aad, plain);
879 return true;
880 }
881
883 {
884 if (
885 outgoing_forwarding_msgs.size() < outgoing_forwarding_queue_size)
886 {
887 outgoing_forwarding_msgs.emplace_back(type, aad, plain);
889 "Queueing outgoing forwarding message - the is the {}/{} "
890 "buffered message",
891 outgoing_forwarding_msgs.size(),
892 outgoing_forwarding_queue_size);
893 return true;
894 }
895
897 "Unable to queue outgoing forwarding message - already queued "
898 "maximum {} messages",
899 outgoing_forwarding_queue_size);
900 return false;
901 }
902
903 default:
904 {
906 "Unhandled message type {} on unestablished channel - ignoring",
907 type);
908 return false;
909 }
910 }
911 }
912
913 auto nonce = send_nonce.fetch_add(1);
914 WireNonce wire_nonce(nonce);
915
917 "send({}, {} bytes, {} bytes) (nonce={})",
918 (size_t)type,
919 aad.size(),
920 plain.size(),
921 nonce);
922
923 GcmHdr gcm_hdr;
924 gcm_hdr.set_iv(
925 reinterpret_cast<const uint8_t*>(&wire_nonce), sizeof(wire_nonce));
926
927 std::vector<uint8_t> cipher;
928 assert(send_key);
929 send_key->encrypt(gcm_hdr.get_iv(), plain, aad, cipher, gcm_hdr.tag);
930
931 const auto gcm_hdr_serialised = gcm_hdr.serialise();
932
933 // Payload is concatenation of 3 things:
934 // 1) aad
935 // 2) gcm header
936 // 3) ciphertext
937 // NB: None of these are length-prefixed, so it is assumed that the
938 // receiver knows the fixed size of the aad and gcm header
939 const serializer::ByteRange payload[] = {
940 {aad.data(), static_cast<size_t>(aad.size())},
941 {gcm_hdr_serialised.data(),
942 static_cast<size_t>(gcm_hdr_serialised.size())},
943 {cipher.data(), static_cast<size_t>(cipher.size())}};
944
946 node_outbound, to_host, peer_id.value(), type, self.value(), payload);
947
948 check_message_limit();
949
950 return true;
951 }
952
953 public:
954 static constexpr size_t protocol_version = 1;
955
957 ringbuffer::AbstractWriterFactory& writer_factory,
958 const ccf::crypto::Pem& service_cert_,
960 const ccf::crypto::Pem& node_cert_,
961 NodeId self_,
962 NodeId peer_id_,
963 size_t message_limit_) :
964 self(std::move(self_)),
965 service_cert(service_cert_),
966 node_kp(std::move(node_kp_)),
967 node_cert(node_cert_),
968 to_host(writer_factory.create_writer_to_outside()),
969 peer_id(std::move(peer_id_)),
970 status(fmt::format("Channel to {}", peer_id), INACTIVE),
971 message_limit(message_limit_)
972 {
973 auto e = ccf::crypto::get_entropy();
974 hkdf_salt = e->random(salt_len);
975 }
976
978 {
979 std::lock_guard<ccf::pal::Mutex> guard(lock);
980 return recv_key != nullptr && send_key != nullptr;
981 }
982
983 // Protocol overview:
984 //
985 // initiate()
986 // > key_exchange_init message
987 // recv_key_exchange_init() [by responder]
988 // < key_exchange_response message
989 // recv_key_exchange_response() [by initiator]
990 // > key_exchange_final message
991 // recv_key_exchange_final() [by responder]
992 // both reach status == ESTABLISHED
993
994 bool send(
995 NodeMsgType type,
996 std::span<const uint8_t> aad,
997 std::span<const uint8_t> plain = {})
998 {
999 std::lock_guard<ccf::pal::Mutex> guard(lock);
1000
1001 return send_unsafe(type, aad, plain);
1002 }
1003
1005 std::span<const uint8_t> aad, const uint8_t*& data, size_t& size)
1006 {
1007 std::lock_guard<ccf::pal::Mutex> guard(lock);
1008
1009 // Receive authenticated message, modifying data to point to the start of
1010 // the non-authenticated plaintext payload
1011 if (recv_key == nullptr)
1012 {
1014 "Node channel with {} cannot receive authenticated message: not "
1015 "established a receive key, status={}",
1016 peer_id,
1017 status.value());
1018 advance_connection_attempt();
1019 return false;
1020 }
1021
1022 GcmHdr hdr;
1023 hdr.deserialise(data, size);
1024
1025 if (!verify(hdr, aad))
1026 {
1027 CHANNEL_RECV_FAIL("Failed to verify node");
1028 return false;
1029 }
1030
1031 return true;
1032 }
1033
1034 bool recv_authenticated_with_load(const uint8_t*& data, size_t& size)
1035 {
1036 std::lock_guard<ccf::pal::Mutex> guard(lock);
1037
1038 // Receive authenticated message, modifying data to point to the start of
1039 // the non-authenticated plaintext payload. data contains payload first,
1040 // then GCM header
1041
1042 if (recv_key == nullptr)
1043 {
1045 "Node channel with {} cannot receive authenticated message with "
1046 "payload: not established a receive key, status={}",
1047 peer_id,
1048 status.value());
1049 advance_connection_attempt();
1050 return false;
1051 }
1052
1053 const uint8_t* data_ = data;
1054 size_t size_ = size;
1055
1056 GcmHdr hdr;
1057 serialized::skip(data_, size_, (size_ - GcmHdr::serialised_size()));
1058 hdr.deserialise(data_, size_);
1059 size -= GcmHdr::serialised_size();
1060
1061 if (!verify(hdr, std::span<const uint8_t>(data, size)))
1062 {
1063 CHANNEL_RECV_FAIL("Failed to verify node message with payload");
1064 return false;
1065 }
1066
1067 return true;
1068 }
1069
1070 std::optional<std::vector<uint8_t>> recv_encrypted(
1071 std::span<const uint8_t> aad, const uint8_t*& data, size_t& size)
1072 {
1073 std::lock_guard<ccf::pal::Mutex> guard(lock);
1074
1075 // Receive encrypted message, returning the decrypted payload
1076 if (recv_key == nullptr)
1077 {
1079 "Node channel with {} cannot receive encrypted message: not "
1080 "established a receive key, status={}",
1081 peer_id,
1082 status.value());
1083 advance_connection_attempt();
1084 return std::nullopt;
1085 }
1086
1087 GcmHdr hdr;
1088 hdr.deserialise(data, size);
1089
1090 std::vector<uint8_t> plain;
1091 if (!decrypt(hdr, aad, std::span<const uint8_t>(data, size), plain))
1092 {
1093 CHANNEL_RECV_FAIL("Failed to decrypt node message");
1094 return std::nullopt;
1095 }
1096
1097 return plain;
1098 }
1099
1101 {
1102 std::lock_guard<ccf::pal::Mutex> guard(lock);
1103
1104 RINGBUFFER_WRITE_MESSAGE(close_node_outbound, to_host, peer_id.value());
1105 reset_key_exchange();
1106 outgoing_consensus_msg.reset();
1107
1108 recv_key.reset();
1109 send_key.reset();
1110 }
1111
1112 bool recv_key_exchange_message(const uint8_t* data, size_t size)
1113 {
1114 std::lock_guard<ccf::pal::Mutex> guard(lock);
1115
1116 try
1117 {
1118 auto chmsg = serialized::read<ChannelMsg>(data, size);
1119 switch (chmsg)
1120 {
1121 case key_exchange_init:
1122 {
1123 // In the case of concurrent key_exchange_init's from both nodes,
1124 // the one with the lower ID wins.
1125 return recv_key_exchange_init(data, size, self < peer_id);
1126 }
1127
1129 {
1130 return recv_key_exchange_response(data, size);
1131 }
1132
1133 case key_exchange_final:
1134 {
1135 return recv_key_exchange_final(data, size);
1136 }
1137
1138 default:
1139 {
1140 throw std::runtime_error(fmt::format(
1141 "Received message with initial bytes {} from {} - not recognised "
1142 "as a key exchange message",
1143 chmsg,
1144 peer_id));
1145 }
1146 }
1147 }
1148 catch (const std::exception& e)
1149 {
1150 LOG_FAIL_FMT("Exception in {}", __PRETTY_FUNCTION__);
1151 LOG_DEBUG_FMT("Error: {}", e.what());
1152 return false;
1153 }
1154 }
1155 };
1156}
1157
1158#undef CHANNEL_RECV_TRACE
1159#undef CHANNEL_SEND_TRACE
1160#undef CHANNEL_RECV_FAIL
1161
1162#pragma clang diagnostic pop
#define CHANNEL_SEND_TRACE(s,...)
Definition channels.h:30
#define CHANNEL_RECV_TRACE(s,...)
Definition channels.h:28
#define CHANNEL_SEND_FAIL(s,...)
Definition channels.h:35
#define CHANNEL_RECV_FAIL(s,...)
Definition channels.h:33
Definition channels.h:155
std::optional< std::vector< uint8_t > > recv_encrypted(std::span< const uint8_t > aad, const uint8_t *&data, size_t &size)
Definition channels.h:1070
static std::chrono::system_clock::duration & min_gap_between_initiation_attempts()
Definition channels.h:158
bool channel_open()
Definition channels.h:977
bool recv_authenticated(std::span< const uint8_t > aad, const uint8_t *&data, size_t &size)
Definition channels.h:1004
bool recv_authenticated_with_load(const uint8_t *&data, size_t &size)
Definition channels.h:1034
bool recv_key_exchange_message(const uint8_t *data, size_t size)
Definition channels.h:1112
bool send(NodeMsgType type, std::span< const uint8_t > aad, std::span< const uint8_t > plain={})
Definition channels.h:994
void close_channel()
Definition channels.h:1100
Channel(ringbuffer::AbstractWriterFactory &writer_factory, const ccf::crypto::Pem &service_cert_, ccf::crypto::ECKeyPairPtr node_kp_, const ccf::crypto::Pem &node_cert_, NodeId self_, NodeId peer_id_, size_t message_limit_)
Definition channels.h:956
static constexpr size_t protocol_version
Definition channels.h:954
Definition pem.h:18
size_t size() const
Definition pem.h:61
const std::string & str() const
Definition pem.h:46
uint8_t * data()
Definition pem.h:51
Definition state_machine.h:15
T value() const
Definition state_machine.h:40
void advance(T state_)
Definition state_machine.h:45
bool check(T state_) const
Definition state_machine.h:35
Definition ring_buffer_types.h:157
Definition key_exchange.h:20
std::vector< uint8_t > get_peer_key_share() const
Definition key_exchange.h:64
void load_peer_key_share(std::span< const uint8_t > ks)
Definition key_exchange.h:84
const std::vector< uint8_t > & get_shared_secret()
Definition key_exchange.h:106
std::vector< uint8_t > get_own_key_share()
Definition key_exchange.h:48
void reset()
Definition key_exchange.h:76
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
std::unique_ptr< KeyAesGcm > make_key_aes_gcm(std::span< const uint8_t > rawKey)
Free function implementation.
Definition symmetric_key.cpp:102
std::vector< uint8_t > hkdf(MDType md_type, size_t length, const std::span< const uint8_t > &ikm, const std::span< const uint8_t > &salt={}, const std::span< const uint8_t > &info={})
Definition hash.cpp:51
EntropyPtr get_entropy()
Definition entropy.cpp:10
std::shared_ptr< Verifier > VerifierPtr
Definition verifier.h:167
VerifierPtr make_verifier(const std::vector< uint8_t > &cert)
Definition verifier.cpp:18
std::shared_ptr< ECKeyPair > ECKeyPairPtr
Definition ec_key_pair.h:144
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:14
EntityId< NodeIdFormatter > NodeId
Definition entity_id.h:164
ChannelStatus
Definition channels.h:41
@ WAITING_FOR_FINAL
Definition channels.h:44
@ ESTABLISHED
Definition channels.h:45
@ INITIATED
Definition channels.h:43
@ INACTIVE
Definition channels.h:42
uint64_t MsgNonce
Definition channels.h:94
@ key_exchange_init
Definition node_types.h:33
@ key_exchange_final
Definition node_types.h:35
@ key_exchange_response
Definition node_types.h:34
void append_buffer(std::vector< uint8_t > &target, std::span< const uint8_t > src)
Definition channels.h:132
WireNonce get_wire_nonce(const GcmHdr &header)
Definition channels.h:117
ccf::crypto::FixedSizeGcmHeader< sizeof(MsgNonce)> GcmHdr
Definition channels.h:95
void append_value(std::vector< uint8_t > &target, const T &t)
Definition channels.h:123
NodeMsgType
Definition node_types.h:21
@ channel_msg
Definition node_types.h:22
@ consensus_msg
Definition node_types.h:23
@ forwarded_msg
Definition node_types.h:24
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
void write(uint8_t *&data, size_t &size, const T &v)
Definition serialized.h:105
void skip(const uint8_t *&data, size_t &size, size_t skip)
Definition serialized.h:165
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition channels.h:102
uint64_t nonce
Definition channels.h:104
WireNonce(uint64_t nonce_)
Definition channels.h:106
uint64_t get_val() const
Definition channels.h:108
const uint8_t tid
Definition channels.h:103
Definition symmetric_key.h:37
static size_t serialised_size()
Definition symmetric_key.h:42
std::vector< uint8_t > iv
Definition symmetric_key.h:21
void deserialise(const std::vector< uint8_t > &ser)
Definition symmetric_key.cpp:56
constexpr auto parse(ParseContext &ctx)
Definition channels.h:54
auto format(const ccf::ChannelStatus &cs, FormatContext &ctx) const
Definition channels.h:60
Definition serializer.h:27
const uint8_t * data
Definition serializer.h:28