CCF
Loading...
Searching...
No Matches
session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/node/session.h"
8#include "tasks/task.h"
9#include "tasks/task_system.h"
10#include "tcp/msg_types.h"
11
12#include <span>
13
14namespace ccf
15{
16 class ThreadedSession : public Session,
17 public std::enable_shared_from_this<ThreadedSession>
18 {
19 private:
20 std::shared_ptr<ccf::tasks::OrderedTasks> task_scheduler;
21 std::atomic<bool> is_closing = false;
22
23 struct SessionDataTask : public ccf::tasks::ITaskAction
24 {
25 std::vector<uint8_t> data;
26 std::shared_ptr<ThreadedSession> self;
27
28 SessionDataTask(
29 std::span<const uint8_t> d, std::shared_ptr<ThreadedSession> s) :
30 self(std::move(s))
31 {
32 data.assign(d.begin(), d.end());
33 }
34 };
35
36 struct HandleIncomingDataTask : public SessionDataTask
37 {
38 using SessionDataTask::SessionDataTask;
39
40 void do_action() override
41 {
42 if (self->is_closing.load())
43 {
44 return;
45 }
46
47 self->handle_incoming_data_thread(std::move(data));
48 }
49
50 [[nodiscard]] const std::string& get_name() const override
51 {
52 static const std::string name =
53 "ThreadedSession::HandleIncomingDataTask";
54 return name;
55 }
56 };
57
58 struct SendDataTask : public SessionDataTask
59 {
60 using SessionDataTask::SessionDataTask;
61
62 void do_action() override
63 {
64 self->send_data_thread(std::move(data));
65 }
66
67 [[nodiscard]] const std::string& get_name() const override
68 {
69 static const std::string name = "ThreadedSession::SendDataTask";
70 return name;
71 }
72 };
73
74 public:
75 ThreadedSession(int64_t session_id)
76 {
77 task_scheduler = ccf::tasks::OrderedTasks::create(
79 fmt::format("Session {}", session_id));
80 }
81
83 {
84 task_scheduler->cancel_task();
85 }
86
87 // Implement Session::handle_incoming_data by dispatching a thread message
88 // that eventually invokes the virtual handle_incoming_data_thread()
89 void handle_incoming_data(std::span<const uint8_t> data) override
90 {
91 auto [_, body] = ringbuffer::read_message<::tcp::tcp_inbound>(data);
92
93 task_scheduler->add_action(
94 std::make_shared<HandleIncomingDataTask>(body, shared_from_this()));
95 }
96
97 virtual void handle_incoming_data_thread(std::vector<uint8_t>&& data) = 0;
98
99 // Implement Session::sent_data by dispatching a thread message
100 // that eventually invokes the virtual send_data_thread()
101 void send_data(std::vector<uint8_t>&& data) override
102 {
103 task_scheduler->add_action(
104 std::make_shared<SendDataTask>(std::move(data), shared_from_this()));
105 }
106
107 virtual void send_data_thread(std::vector<uint8_t>&& data) = 0;
108
109 void close_session() override
110 {
111 is_closing.store(true);
112
113 task_scheduler->add_action(ccf::tasks::make_basic_action(
114 [self = shared_from_this()]() { self->close_session_thread(); }));
115 }
116
117 virtual void close_session_thread() = 0;
118 };
119
121 {
122 public:
123 virtual bool parse(std::span<const uint8_t> data) = 0;
124
125 protected:
126 std::shared_ptr<ccf::TLSSession> tls_io;
128
130 ::tcp::ConnID session_id_,
131 ringbuffer::AbstractWriterFactory& writer_factory,
132 std::unique_ptr<ccf::tls::Context> ctx) :
133 ThreadedSession(session_id_),
134 tls_io(std::make_shared<ccf::TLSSession>(
135 session_id_, writer_factory, std::move(ctx))),
136 session_id(session_id_)
137 {}
138
139 public:
140 void send_data_thread(std::vector<uint8_t>&& data) override
141 {
142 tls_io->send_data(data.data(), data.size());
143 }
144
145 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
146 {
147 tls_io->recv_buffered(data.data(), data.size());
148
149 LOG_TRACE_FMT("recv called with {} bytes", data.size());
150
151 // Try to parse all incoming data, reusing the vector we were just passed
152 // for storage. Increase the size if the received vector was too small
153 // (for the case where this chunk is very small, but we had some previous
154 // data to continue reading).
155 constexpr auto min_read_block_size = 4096;
156 if (data.size() < min_read_block_size)
157 {
158 data.resize(min_read_block_size);
159 }
160
161 auto n_read = tls_io->read(data.data(), data.size(), false);
162
163 while (true)
164 {
165 if (n_read == 0)
166 {
167 return;
168 }
169
170 LOG_TRACE_FMT("Going to parse {} bytes", n_read);
171
172 bool cont = parse({data.data(), n_read});
173 if (!cont)
174 {
175 return;
176 }
177
178 // Used all provided bytes - check if more are available
179 n_read = tls_io->read(data.data(), data.size(), false);
180 }
181 }
182
183 void close_session_thread() override
184 {
185 tls_io->close();
186 }
187 };
188
190 {
191 public:
192 virtual bool parse(std::span<const uint8_t> data) = 0;
193
194 protected:
197
199 ::tcp::ConnID session_id_,
200 ringbuffer::AbstractWriterFactory& writer_factory_) :
201 ccf::ThreadedSession(session_id_),
202 session_id(session_id_),
203 to_host(writer_factory_.create_writer_to_outside())
204 {}
205
206 void send_data_thread(std::vector<uint8_t>&& data) override
207 {
209 ::tcp::tcp_outbound,
210 to_host,
212 serializer::ByteRange{data.data(), data.size()});
213 }
214
215 void close_session_thread() override
216 {
218 ::tcp::tcp_stop, to_host, session_id, std::string("Session closed"));
219 }
220
221 void handle_incoming_data_thread(std::vector<uint8_t>&& data) override
222 {
223 parse(data);
224 }
225 };
226}
Definition session.h:121
EncryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition session.h:129
::tcp::ConnID session_id
Definition session.h:127
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:140
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:145
void close_session_thread() override
Definition session.h:183
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:126
virtual bool parse(std::span< const uint8_t > data)=0
Definition session.h:11
Definition tls_session.h:27
Definition session.h:18
void handle_incoming_data(std::span< const uint8_t > data) override
Definition session.h:89
virtual void handle_incoming_data_thread(std::vector< uint8_t > &&data)=0
void close_session() override
Definition session.h:109
~ThreadedSession() override
Definition session.h:82
virtual void close_session_thread()=0
void send_data(std::vector< uint8_t > &&data) override
Definition session.h:101
ThreadedSession(int64_t session_id)
Definition session.h:75
virtual void send_data_thread(std::vector< uint8_t > &&data)=0
Definition session.h:190
UnencryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition session.h:198
::tcp::ConnID session_id
Definition session.h:195
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:221
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:206
virtual bool parse(std::span< const uint8_t > data)=0
ringbuffer::WriterPtr to_host
Definition session.h:196
void close_session_thread() override
Definition session.h:215
static std::shared_ptr< OrderedTasks > create(JobBoard &job_board_, const std::string &name_="[Ordered]")
Definition ordered_tasks.cpp:82
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
TaskAction make_basic_action(Ts &&... ts)
Definition ordered_tasks.h:47
JobBoard & get_main_job_board()
Definition task_system.cpp:53
Definition app_interface.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
int64_t ConnID
Definition msg_types.h:9
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition ordered_tasks.h:13
Definition serializer.h:27
const uint8_t * data
Definition serializer.h:28