CCF
Loading...
Searching...
No Matches
session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/node/session.h"
8#include "tcp/msg_types.h"
9
10#include <span>
11
12namespace ccf
13{
14 class ThreadedSession : public Session,
15 public std::enable_shared_from_this<ThreadedSession>
16 {
17 private:
18 size_t execution_thread;
19
20 struct SendRecvMsg
21 {
22 std::vector<uint8_t> data;
23 std::shared_ptr<ThreadedSession> self;
24 };
25
26 public:
27 ThreadedSession(int64_t thread_affinity)
28 {
29 execution_thread =
31 thread_affinity);
32 }
33
34 // Implement Session::handle_incoming_data by dispatching a thread message
35 // that eventually invokes the virtual handle_incoming_data_thread()
36 void handle_incoming_data(std::span<const uint8_t> data) override
37 {
38 auto [_, body] = ringbuffer::read_message<::tcp::tcp_inbound>(data);
39
40 auto msg = std::make_unique<::threading::Tmsg<SendRecvMsg>>(
42 msg->data.self = this->shared_from_this();
43 msg->data.data.assign(body.data, body.data + body.size);
44
46 execution_thread, std::move(msg));
47 }
48
50 std::unique_ptr<::threading::Tmsg<SendRecvMsg>> msg)
51 {
52 msg->data.self->handle_incoming_data_thread(std::move(msg->data.data));
53 }
54
55 virtual void handle_incoming_data_thread(std::vector<uint8_t>&& data) = 0;
56
57 // Implement Session::sent_data by dispatching a thread message
58 // that eventually invokes the virtual send_data_thread()
59 void send_data(std::span<const uint8_t> data) override
60 {
61 auto msg =
62 std::make_unique<::threading::Tmsg<SendRecvMsg>>(&send_data_cb);
63 msg->data.self = this->shared_from_this();
64 msg->data.data.assign(data.begin(), data.end());
65
67 execution_thread, std::move(msg));
68 }
69
70 static void send_data_cb(
71 std::unique_ptr<::threading::Tmsg<SendRecvMsg>> msg)
72 {
73 msg->data.self->send_data_thread(std::move(msg->data.data));
74 }
75
76 virtual void send_data_thread(std::vector<uint8_t>&& data) = 0;
77 };
78
80 {
81 public:
82 virtual bool parse(std::span<const uint8_t> data) = 0;
83
84 protected:
85 std::shared_ptr<ccf::TLSSession> tls_io;
87
89 ::tcp::ConnID session_id_,
91 std::unique_ptr<ccf::tls::Context> ctx) :
92 ThreadedSession(session_id_),
93 tls_io(std::make_shared<ccf::TLSSession>(
94 session_id_, writer_factory, std::move(ctx))),
95 session_id(session_id_)
96 {}
97
98 public:
99 void send_data(std::span<const uint8_t> data) override
100 {
101 // Override send_data rather than send_data_thread, as the TLSSession
102 // handles dispatching for thread affinity
103 tls_io->send_raw(data.data(), data.size());
104 }
105
106 void send_data_thread(std::vector<uint8_t>&& data) override
107 {
108 throw std::logic_error("Unimplemented");
109 }
110
111 void close_session() override
112 {
113 tls_io->close();
114 }
115
116 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
117 {
118 tls_io->recv_buffered(data.data(), data.size());
119
120 LOG_TRACE_FMT("recv called with {} bytes", data.size());
121
122 // Try to parse all incoming data, reusing the vector we were just passed
123 // for storage. Increase the size if the received vector was too small
124 // (for the case where this chunk is very small, but we had some previous
125 // data to continue reading).
126 constexpr auto min_read_block_size = 4096;
127 if (data.size() < min_read_block_size)
128 {
129 data.resize(min_read_block_size);
130 }
131
132 auto n_read = tls_io->read(data.data(), data.size(), false);
133
134 while (true)
135 {
136 if (n_read == 0)
137 {
138 return;
139 }
140
141 LOG_TRACE_FMT("Going to parse {} bytes", n_read);
142
143 bool cont = parse({data.data(), n_read});
144 if (!cont)
145 {
146 return;
147 }
148
149 // Used all provided bytes - check if more are available
150 n_read = tls_io->read(data.data(), data.size(), false);
151 }
152 }
153 };
154
156 {
157 public:
158 virtual bool parse(std::span<const uint8_t> data) = 0;
159
160 protected:
163
165 ::tcp::ConnID session_id_,
166 ringbuffer::AbstractWriterFactory& writer_factory_) :
167 ccf::ThreadedSession(session_id_),
168 session_id(session_id_),
169 to_host(writer_factory_.create_writer_to_outside())
170 {}
171
172 void send_data_thread(std::vector<uint8_t>&& data) override
173 {
175 ::tcp::tcp_outbound,
176 to_host,
178 serializer::ByteRange{data.data(), data.size()});
179 }
180
181 void close_session() override
182 {
184 ::tcp::tcp_stop, to_host, session_id, std::string("Session closed"));
185 }
186
187 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
188 {
189 parse(data);
190 }
191 };
192}
Definition session.h:80
void send_data(std::span< const uint8_t > data) override
Definition session.h:99
EncryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition session.h:88
void close_session() override
Definition session.h:111
::tcp::ConnID session_id
Definition session.h:86
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:106
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:116
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:85
virtual bool parse(std::span< const uint8_t > data)=0
Definition session.h:11
Definition tls_session.h:28
Definition session.h:16
void handle_incoming_data(std::span< const uint8_t > data) override
Definition session.h:36
virtual void handle_incoming_data_thread(std::vector< uint8_t > &&data)=0
ThreadedSession(int64_t thread_affinity)
Definition session.h:27
void send_data(std::span< const uint8_t > data) override
Definition session.h:59
static void send_data_cb(std::unique_ptr<::threading::Tmsg< SendRecvMsg > > msg)
Definition session.h:70
static void handle_incoming_data_cb(std::unique_ptr<::threading::Tmsg< SendRecvMsg > > msg)
Definition session.h:49
virtual void send_data_thread(std::vector< uint8_t > &&data)=0
Definition session.h:156
UnencryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition session.h:164
::tcp::ConnID session_id
Definition session.h:161
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:187
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:172
virtual bool parse(std::span< const uint8_t > data)=0
ringbuffer::WriterPtr to_host
Definition session.h:162
void close_session() override
Definition session.h:181
Definition ring_buffer_types.h:153
static ThreadMessaging & instance()
Definition thread_messaging.h:283
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:318
uint16_t get_execution_thread(uint32_t i)
Definition thread_messaging.h:371
#define LOG_TRACE_FMT
Definition logger.h:356
Definition app_interface.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
STL namespace.
int64_t ConnID
Definition msg_types.h:9
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition serializer.h:27
const uint8_t * data
Definition serializer.h:28
Definition thread_messaging.h:27