CCF
Loading...
Searching...
No Matches
quic_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ds/messaging.h"
7#include "ds/pending_io.h"
8#include "ds/ring_buffer.h"
9#include "enclave/session.h"
10#include "udp/msg_types.h"
11
12#include <exception>
13
14namespace quic
15{
16 class QUICSession : public ccf::Session,
17 public std::enable_shared_from_this<QUICSession>
18 {
19 protected:
22
23 std::shared_ptr<ccf::tasks::OrderedTasks> task_scheduler;
24
25 enum Status : std::uint8_t
26 {
31 error
32 };
33
35 {
36 return status;
37 }
38
40 using PendingList = std::vector<PendingBuffer>;
43
44 private:
45 // Decrypted data
46 std::vector<uint8_t> read_buffer;
47
48 Status status = handshake;
49
50 public:
52 int64_t session_id_, ringbuffer::AbstractWriterFactory& writer_factory_) :
53 to_host(writer_factory_.create_writer_to_outside()),
54 session_id(session_id_)
55 {
58 fmt::format("Session {}", session_id));
59 }
60
61 ~QUICSession() override
62 {
63 task_scheduler->cancel_task();
64 // RINGBUFFER_WRITE_MESSAGE(quic::quic_closed, to_host, session_id);
65 }
66
67 std::string hostname()
68 {
69 return {};
70 }
71
72 std::vector<uint8_t> peer_cert()
73 {
74 return {};
75 }
76
77 // Returns count N of bytes read, which will be the first N bytes of data,
78 // up to a maximum of size. If exact is true, will only return either size
79 // or 0 (when size bytes are not currently available). data may be accessed
80 // beyond N during operation, up to size, but only the first N should be
81 // used by caller.
82 size_t read(uint8_t* data, size_t size, sockaddr addr, bool exact = false)
83 {
84 LOG_TRACE_FMT("Requesting up to {} bytes", size);
85
86 // This will return empty if the connection isn't
87 // ready, but it will not block on the handshake.
88 do_handshake();
89
90 if (status != ready)
91 {
92 return 0;
93 }
94
95 // Send pending writes.
96 flush();
97
98 size_t offset = 0;
99
100 if (!read_buffer.empty())
101 {
103 "Have existing read_buffer of size: {}", read_buffer.size());
104 offset = std::min(size, read_buffer.size());
105 ::memcpy(data, read_buffer.data(), offset);
106
107 if (offset < read_buffer.size())
108 {
109 read_buffer.erase(read_buffer.begin(), read_buffer.begin() + offset);
110 }
111 else
112 {
113 read_buffer.clear();
114 }
115
116 if (offset == size)
117 {
118 return size;
119 }
120
121 // NB: If we continue past here, read_buffer is empty
122 }
123
124 // This will need to be handled by the actual QUIC stack
125 auto r = handle_recv(data + offset, size - offset, addr);
126 LOG_TRACE_FMT("quic read returned: {}", r);
127
128 if (r < 0)
129 {
130 LOG_TRACE_FMT("QUIC {} error on read", session_id);
131 stop(error);
132 return 0;
133 }
134
135 auto total = r + offset;
136
137 // We read _some_ data but not enough, and didn't get
138 // WANT_READ. Probably hit an internal size limit - try
139 // again
140 if (exact && (total < size))
141 {
143 "Asked for exactly {}, received {}, retrying", size, total);
144 read_buffer.insert(read_buffer.end(), data, data + total);
145 return read(data, size, addr, exact);
146 }
147
148 return total;
149 }
150
151 void recv_buffered(const uint8_t* data, size_t size, sockaddr addr)
152 {
153 LOG_TRACE_FMT("QUIC Session recv_buffered with {} bytes", size);
154 pending_reads.emplace_back(const_cast<uint8_t*>(data), size, addr);
155 do_handshake();
156 }
157
159 {
160 std::shared_ptr<QUICSession> self;
161 std::vector<uint8_t> data;
162 sockaddr addr{};
163
165 std::shared_ptr<QUICSession> s,
166 std::span<const uint8_t> d,
167 sockaddr sa) :
168 self(std::move(s)),
169 addr(sa)
170 {
171 data.assign(d.begin(), d.end());
172 }
173 };
174
176 {
178
179 void do_action() override
180 {
181 self->send_raw_thread(data, addr);
182 }
183
184 [[nodiscard]] const std::string& get_name() const override
185 {
186 static const std::string name = "quic::SendDataTask";
187 return name;
188 }
189 };
190
192 {
194
195 void do_action() override
196 {
197 self->recv(data.data(), data.size(), addr);
198 }
199
200 [[nodiscard]] const std::string& get_name() const override
201 {
202 static const std::string name = "quic::RecvDataTask";
203 return name;
204 }
205 };
206
207 void send_raw(const uint8_t* data, size_t size, sockaddr addr)
208 {
209 task_scheduler->add_action(std::make_shared<SendDataTask>(
210 shared_from_this(), std::span<const uint8_t>{data, size}, addr));
211 }
212
213 void send_raw_thread(const std::vector<uint8_t>& data, sockaddr addr)
214 {
215 // Writes as much of the data as possible. If the data cannot all
216 // be written now, we store the remainder. We
217 // will try to send pending writes again whenever write() is called.
218 do_handshake();
219
220 if (status == handshake)
221 {
222 pending_writes.emplace_back(
223 const_cast<uint8_t*>(data.data()), data.size(), addr);
224 return;
225 }
226
227 if (status != ready)
228 {
229 return;
230 }
231
232 pending_writes.emplace_back(
233 const_cast<uint8_t*>(data.data()), data.size(), addr);
234
235 flush();
236 }
237
238 void send_buffered(const std::vector<uint8_t>& data, sockaddr addr)
239 {
240 pending_writes.emplace_back(
241 const_cast<uint8_t*>(data.data()), data.size(), addr);
242 }
243
244 void handle_incoming_data(std::span<const uint8_t> data) override
245 {
246 auto [_, addr_family, addr_data, body] =
247 ringbuffer::read_message<udp::udp_inbound>(data);
248
249 task_scheduler->add_action(std::make_shared<RecvDataTask>(
250 shared_from_this(),
251 body,
252 udp::sockaddr_decode(addr_family, addr_data)));
253 }
254
255 virtual void recv(const uint8_t* data_, size_t size_, sockaddr addr_) = 0;
256
257 void flush()
258 {
259 do_handshake();
260
261 if (status != ready)
262 {
263 return;
264 }
265
266 for (auto& write : pending_writes)
267 {
268 LOG_TRACE_FMT("QUIC write_some {} bytes", write.len);
269
270 // This will need to be handled by the actual QUIC stack
271 int rc = handle_send(write.req, write.len, write.addr);
272 if (rc < 0)
273 {
274 LOG_TRACE_FMT("QUIC {} error on flush", session_id);
275 stop(error);
276 return;
277 }
278
279 // Mark for deletion (avoiding invalidating iterator)
280 write.clear = true;
281 }
282
283 // Clear all marked for deletion
285 }
286
287 void close_session() override
288 {
289 auto self = shared_from_this();
290 task_scheduler->add_action(
291 ccf::tasks::make_basic_action([self]() { self->close_thread(); }));
292 }
293
295 {
296 switch (status)
297 {
298 case handshake:
299 {
300 LOG_TRACE_FMT("QUIC {} closed during handshake", session_id);
301 stop(closed);
302 break;
303 }
304
305 case ready:
306 {
307 LOG_TRACE_FMT("QUIC {} closed", session_id);
308 stop(closed);
309 break;
310 }
311
312 default:
313 {
314 }
315 }
316 }
317
318 private:
319 void do_handshake()
320 {
321 // This should be called when additional data is written to the
322 // input buffer, until the handshake is complete.
323 if (status != handshake)
324 {
325 return;
326 }
327
328 // This will need to be handled by the actual QUIC stack
329 LOG_TRACE_FMT("QUIC do_handshake unimplemented");
330 status = ready;
331 }
332
333 void stop(Status status_)
334 {
335 switch (status)
336 {
337 case closed:
338 case authfail:
339 case error:
340 return;
341
342 default:
343 {
344 }
345 }
346
347 status = status_;
348 }
349
350 int handle_send(const uint8_t* buf, size_t len, sockaddr addr)
351 {
352 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
353
354 // Either write all of the data or none of it.
355 auto wrote = RINGBUFFER_TRY_WRITE_MESSAGE(
356 udp::udp_outbound,
357 to_host,
359 addr_family,
360 addr_data,
361 serializer::ByteRange{buf, len});
362
363 if (!wrote)
364 {
365 return -1;
366 }
367
368 return (int)len;
369 }
370
371 int handle_recv(uint8_t* buf, size_t len, sockaddr addr)
372 {
373 size_t len_read = 0;
374 for (auto& read : pending_reads)
375 {
376 // Only handle pending reads that belong to the same address
377 if (memcmp((void*)&addr, (void*)&read.addr, sizeof(addr)) != 0)
378 {
379 continue;
380 }
381
382 size_t rd = std::min(len, read.len);
383 ::memcpy(buf, read.req, rd);
384 read.clear = true;
385
386 // UDP packets are datagrams, so it's either whole or nothing
387 len_read += rd;
388 if (len_read >= len)
389 {
390 break;
391 }
392 }
393
394 // Clear all marked for deletion
396
397 if (len_read > 0)
398 {
399 return len_read;
400 }
401 return -1;
402 }
403 };
404
405 // This is a wrapper for the QUICSession so we can use in rpc_sessions
406 // Ultimately, this needs to be an HTTP3ServerSession : HTTP3Session :
407 // QUICSession
409 {
410 std::shared_ptr<ccf::RPCMap> rpc_map;
411 std::shared_ptr<ccf::RpcHandler> handler;
412 std::shared_ptr<ccf::SessionContext> session_ctx;
413 int64_t session_id;
414 ccf::ListenInterfaceID interface_id;
415 sockaddr addr;
416
418 void echo()
419 {
421 flush();
422 }
423
424 public:
426 std::shared_ptr<ccf::RPCMap> rpc_map_,
427 int64_t session_id_,
428 ccf::ListenInterfaceID interface_id_,
429 ringbuffer::AbstractWriterFactory& writer_factory) :
430 QUICSession(session_id_, writer_factory),
431 rpc_map(std::move(rpc_map_)),
432 session_id(session_id_),
433 interface_id(std::move(interface_id_)),
434 addr{}
435 {}
436
437 void send_data(std::vector<uint8_t>&& data) override
438 {
439 send_raw(data.data(), data.size(), addr);
440 }
441
442 void recv(const uint8_t* data_, size_t size_, sockaddr addr_) override
443 {
444 recv_buffered(data_, size_, addr_);
445 addr = addr_;
446
447 LOG_TRACE_FMT("recv called with {} bytes", size_);
448
449 // ECHO SERVER
450 echo();
451 }
452 };
453}
Definition session.h:11
static std::shared_ptr< OrderedTasks > create(JobBoard &job_board_, const std::string &name_="[Ordered]")
Definition ordered_tasks.cpp:82
Definition quic_session.h:409
void recv(const uint8_t *data_, size_t size_, sockaddr addr_) override
Definition quic_session.h:442
QUICEchoSession(std::shared_ptr< ccf::RPCMap > rpc_map_, int64_t session_id_, ccf::ListenInterfaceID interface_id_, ringbuffer::AbstractWriterFactory &writer_factory)
Definition quic_session.h:425
void send_data(std::vector< uint8_t > &&data) override
Definition quic_session.h:437
Definition quic_session.h:18
PendingList pending_reads
Definition quic_session.h:42
ringbuffer::WriterPtr to_host
Definition quic_session.h:20
QUICSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition quic_session.h:51
PendingList pending_writes
Definition quic_session.h:41
std::shared_ptr< ccf::tasks::OrderedTasks > task_scheduler
Definition quic_session.h:23
void send_raw_thread(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:213
void flush()
Definition quic_session.h:257
void close_session() override
Definition quic_session.h:287
void handle_incoming_data(std::span< const uint8_t > data) override
Definition quic_session.h:244
std::vector< PendingBuffer > PendingList
Definition quic_session.h:40
Status get_status() const
Definition quic_session.h:34
virtual void recv(const uint8_t *data_, size_t size_, sockaddr addr_)=0
std::vector< uint8_t > peer_cert()
Definition quic_session.h:72
Status
Definition quic_session.h:26
@ ready
Definition quic_session.h:28
@ handshake
Definition quic_session.h:27
@ authfail
Definition quic_session.h:30
@ closed
Definition quic_session.h:29
@ error
Definition quic_session.h:31
~QUICSession() override
Definition quic_session.h:61
std::string hostname()
Definition quic_session.h:67
void close_thread()
Definition quic_session.h:294
void send_buffered(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:238
size_t read(uint8_t *data, size_t size, sockaddr addr, bool exact=false)
Definition quic_session.h:82
void send_raw(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:207
ccf::tls::ConnID session_id
Definition quic_session.h:21
void recv_buffered(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:151
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
TaskAction make_basic_action(Ts &&... ts)
Definition ordered_tasks.h:47
JobBoard & get_main_job_board()
Definition task_system.cpp:53
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition quic_session.h:15
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
#define RINGBUFFER_TRY_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:262
Pending writes on both host and enclave, with data, length and destination address.
Definition pending_io.h:18
static void clear_empty(std::vector< PendingIO< T > > &list)
Clears a list of PendingIO<T> of all elements that were marked to remove (clear flag == true).
Definition pending_io.h:79
Definition ordered_tasks.h:13
Definition quic_session.h:192
const std::string & get_name() const override
Definition quic_session.h:200
void do_action() override
Definition quic_session.h:195
Definition quic_session.h:176
void do_action() override
Definition quic_session.h:179
const std::string & get_name() const override
Definition quic_session.h:184
Definition quic_session.h:159
SessionDataTask(std::shared_ptr< QUICSession > s, std::span< const uint8_t > d, sockaddr sa)
Definition quic_session.h:164
sockaddr addr
Definition quic_session.h:162
std::shared_ptr< QUICSession > self
Definition quic_session.h:160
std::vector< uint8_t > data
Definition quic_session.h:161
Definition serializer.h:27