17 public std::enable_shared_from_this<ThreadedSession>
20 std::shared_ptr<ccf::tasks::OrderedTasks> task_scheduler;
21 std::atomic<bool> is_closing =
false;
25 std::vector<uint8_t> data;
26 std::shared_ptr<ThreadedSession> self;
29 std::span<const uint8_t> d, std::shared_ptr<ThreadedSession> s) :
32 data.assign(d.begin(), d.end());
36 struct HandleIncomingDataTask :
public SessionDataTask
38 using SessionDataTask::SessionDataTask;
40 void do_action()
override
42 if (self->is_closing.load())
47 self->handle_incoming_data_thread(std::move(data));
50 [[nodiscard]]
const std::string& get_name()
const override
52 static const std::string name =
53 "ThreadedSession::HandleIncomingDataTask";
58 struct SendDataTask :
public SessionDataTask
60 using SessionDataTask::SessionDataTask;
62 void do_action()
override
64 self->send_data_thread(std::move(data));
67 [[nodiscard]]
const std::string& get_name()
const override
69 static const std::string name =
"ThreadedSession::SendDataTask";
79 fmt::format(
"Session {}", session_id));
84 task_scheduler->cancel_task();
91 auto [_, body] = ringbuffer::read_message<::tcp::tcp_inbound>(data);
93 task_scheduler->add_action(
94 std::make_shared<HandleIncomingDataTask>(body, shared_from_this()));
103 task_scheduler->add_action(
104 std::make_shared<SendDataTask>(std::move(data), shared_from_this()));
111 is_closing.store(
true);
114 [self = shared_from_this()]() { self->close_session_thread(); }));
123 virtual bool parse(std::span<const uint8_t> data) = 0;
132 std::unique_ptr<ccf::tls::Context> ctx) :
135 session_id_, writer_factory,
std::move(ctx))),
142 tls_io->send_data(data.data(), data.size());
147 tls_io->recv_buffered(data.data(), data.size());
155 constexpr auto min_read_block_size = 4096;
156 if (data.size() < min_read_block_size)
158 data.resize(min_read_block_size);
161 auto n_read =
tls_io->read(data.data(), data.size(),
false);
172 bool cont =
parse({data.data(), n_read});
179 n_read =
tls_io->read(data.data(), data.size(),
false);
192 virtual bool parse(std::span<const uint8_t> data) = 0;
203 to_host(writer_factory_.create_writer_to_outside())
EncryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory, std::unique_ptr< ccf::tls::Context > ctx)
Definition session.h:129
::tcp::ConnID session_id
Definition session.h:127
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:140
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:145
void close_session_thread() override
Definition session.h:183
std::shared_ptr< ccf::TLSSession > tls_io
Definition session.h:126
virtual bool parse(std::span< const uint8_t > data)=0
Definition tls_session.h:27
void handle_incoming_data(std::span< const uint8_t > data) override
Definition session.h:89
virtual void handle_incoming_data_thread(std::vector< uint8_t > &&data)=0
void close_session() override
Definition session.h:109
~ThreadedSession() override
Definition session.h:82
virtual void close_session_thread()=0
void send_data(std::vector< uint8_t > &&data) override
Definition session.h:101
ThreadedSession(int64_t session_id)
Definition session.h:75
virtual void send_data_thread(std::vector< uint8_t > &&data)=0
UnencryptedSession(::tcp::ConnID session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition session.h:198
::tcp::ConnID session_id
Definition session.h:195
void handle_incoming_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:221
void send_data_thread(std::vector< uint8_t > &&data) override
Definition session.h:206
virtual bool parse(std::span< const uint8_t > data)=0
ringbuffer::WriterPtr to_host
Definition session.h:196
void close_session_thread() override
Definition session.h:215
static std::shared_ptr< OrderedTasks > create(JobBoard &job_board_, const std::string &name_="[Ordered]")
Definition ordered_tasks.cpp:82
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
TaskAction make_basic_action(Ts &&... ts)
Definition ordered_tasks.h:47
JobBoard & get_main_job_board()
Definition task_system.cpp:53
Definition app_interface.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
int64_t ConnID
Definition msg_types.h:9
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition ordered_tasks.h:13
Definition serializer.h:27
const uint8_t * data
Definition serializer.h:28