CCF
Loading...
Searching...
No Matches
quic_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/logger.h"
6#include "ds/messaging.h"
7#include "ds/pending_io.h"
8#include "ds/ring_buffer.h"
10#include "enclave/session.h"
11#include "udp/msg_types.h"
12
13#include <exception>
14
15namespace quic
16{
17 class QUICSession : public ccf::Session,
18 public std::enable_shared_from_this<QUICSession>
19 {
20 protected:
24
33
35 {
36 return status;
37 }
38
39 protected:
41 using PendingList = std::vector<PendingBuffer>;
44
45 private:
46 // Decrypted data
47 std::vector<uint8_t> read_buffer;
48
49 Status status;
50
51 public:
53 int64_t session_id_, ringbuffer::AbstractWriterFactory& writer_factory_) :
54 to_host(writer_factory_.create_writer_to_outside()),
55 session_id(session_id_),
56 status(handshake)
57 {
60 }
61
63 {
64 // RINGBUFFER_WRITE_MESSAGE(quic::quic_closed, to_host, session_id);
65 }
66
67 std::string hostname()
68 {
69 return {};
70 }
71
72 std::vector<uint8_t> peer_cert()
73 {
74 return {};
75 }
76
77 // Returns count N of bytes read, which will be the first N bytes of data,
78 // up to a maximum of size. If exact is true, will only return either size
79 // or 0 (when size bytes are not currently available). data may be accessed
80 // beyond N during operation, up to size, but only the first N should be
81 // used by caller.
82 size_t read(uint8_t* data, size_t size, sockaddr addr, bool exact = false)
83 {
84 LOG_TRACE_FMT("Requesting up to {} bytes", size);
85
86 // This will return empty if the connection isn't
87 // ready, but it will not block on the handshake.
88 do_handshake();
89
90 if (status != ready)
91 {
92 return 0;
93 }
94
95 // Send pending writes.
96 flush();
97
98 size_t offset = 0;
99
100 if (read_buffer.size() > 0)
101 {
103 "Have existing read_buffer of size: {}", read_buffer.size());
104 offset = std::min(size, read_buffer.size());
105 ::memcpy(data, read_buffer.data(), offset);
106
107 if (offset < read_buffer.size())
108 read_buffer.erase(read_buffer.begin(), read_buffer.begin() + offset);
109 else
110 read_buffer.clear();
111
112 if (offset == size)
113 return size;
114
115 // NB: If we continue past here, read_buffer is empty
116 }
117
118 // This will need to be handled by the actual QUIC stack
119 auto r = handle_recv(data + offset, size - offset, addr);
120 LOG_TRACE_FMT("quic read returned: {}", r);
121
122 if (r < 0)
123 {
124 LOG_TRACE_FMT("QUIC {} error on read", session_id);
125 stop(error);
126 return 0;
127 }
128
129 auto total = r + offset;
130
131 // We read _some_ data but not enough, and didn't get
132 // WANT_READ. Probably hit an internal size limit - try
133 // again
134 if (exact && (total < size))
135 {
137 "Asked for exactly {}, received {}, retrying", size, total);
138 read_buffer.insert(read_buffer.end(), data, data + total);
139 return read(data, size, addr, exact);
140 }
141
142 return total;
143 }
144
145 void recv_buffered(const uint8_t* data, size_t size, sockaddr addr)
146 {
148 {
149 throw std::runtime_error("Called recv_buffered from incorrect thread");
150 }
151 LOG_TRACE_FMT("QUIC Session recv_buffered with {} bytes", size);
152 pending_reads.emplace_back(const_cast<uint8_t*>(data), size, addr);
153 do_handshake();
154 }
155
157 {
158 std::vector<uint8_t> data;
159 std::shared_ptr<QUICSession> self;
160 sockaddr addr;
161 };
162
163 static void send_raw_cb(std::unique_ptr<threading::Tmsg<SendRecvMsg>> msg)
164 {
165 msg->data.self->send_raw_thread(msg->data.data, msg->data.addr);
166 }
167
168 void send_raw(const uint8_t* data, size_t size, sockaddr addr)
169 {
170 auto msg = std::make_unique<threading::Tmsg<SendRecvMsg>>(&send_raw_cb);
171 msg->data.self = this->shared_from_this();
172 msg->data.data = std::vector<uint8_t>(data, data + size);
173 msg->data.addr = addr;
174
176 execution_thread, std::move(msg));
177 }
178
179 void send_raw_thread(const std::vector<uint8_t>& data, sockaddr addr)
180 {
182 {
183 throw std::runtime_error(
184 "Called send_raw_thread from incorrect thread");
185 }
186 // Writes as much of the data as possible. If the data cannot all
187 // be written now, we store the remainder. We
188 // will try to send pending writes again whenever write() is called.
189 do_handshake();
190
191 if (status == handshake)
192 {
193 pending_writes.emplace_back(
194 const_cast<uint8_t*>(data.data()), data.size(), addr);
195 return;
196 }
197
198 if (status != ready)
199 return;
200
201 pending_writes.emplace_back(
202 const_cast<uint8_t*>(data.data()), data.size(), addr);
203
204 flush();
205 }
206
207 void send_buffered(const std::vector<uint8_t>& data, sockaddr addr)
208 {
210 {
211 throw std::runtime_error("Called send_buffered from incorrect thread");
212 }
213
214 pending_writes.emplace_back(
215 const_cast<uint8_t*>(data.data()), data.size(), addr);
216 }
217
218 void flush()
219 {
221 {
222 throw std::runtime_error("Called flush from incorrect thread");
223 }
224
225 do_handshake();
226
227 if (status != ready)
228 return;
229
230 int written = 0;
231 for (auto& write : pending_writes)
232 {
233 LOG_TRACE_FMT("QUIC write_some {} bytes", write.len);
234
235 // This will need to be handled by the actual QUIC stack
236 int rc = handle_send(write.req, write.len, write.addr);
237 if (rc < 0)
238 {
239 LOG_TRACE_FMT("QUIC {} error on flush", session_id);
240 stop(error);
241 return;
242 }
243 written += rc;
244
245 // Mark for deletion (avoiding invalidating iterator)
246 write.clear = true;
247 }
248
249 // Clear all marked for deletion
251 }
252
253 struct EmptyMsg
254 {
255 std::shared_ptr<QUICSession> self;
256 };
257
258 static void close_cb(std::unique_ptr<threading::Tmsg<EmptyMsg>> msg)
259 {
260 msg->data.self->close_thread();
261 }
262
263 void close_session() override
264 {
265 auto msg = std::make_unique<threading::Tmsg<EmptyMsg>>(&close_cb);
266 msg->data.self = this->shared_from_this();
267
269 execution_thread, std::move(msg));
270 }
271
273 {
275 {
276 throw std::runtime_error("Called close_thread from incorrect thread");
277 }
278
279 switch (status)
280 {
281 case handshake:
282 {
283 LOG_TRACE_FMT("QUIC {} closed during handshake", session_id);
284 stop(closed);
285 break;
286 }
287
288 case ready:
289 {
290 LOG_TRACE_FMT("QUIC {} closed", session_id);
291 stop(closed);
292 break;
293 }
294
295 default:
296 {
297 }
298 }
299 }
300
301 private:
302 void do_handshake()
303 {
304 // This should be called when additional data is written to the
305 // input buffer, until the handshake is complete.
306 if (status != handshake)
307 return;
308
309 // This will need to be handled by the actual QUIC stack
310 LOG_TRACE_FMT("QUIC do_handshake unimplemented");
311 status = ready;
312 }
313
314 void stop(Status status_)
315 {
316 switch (status)
317 {
318 case closed:
319 case authfail:
320 case error:
321 return;
322
323 default:
324 {
325 }
326 }
327
328 status = status_;
329 }
330
331 int handle_send(const uint8_t* buf, size_t len, sockaddr addr)
332 {
333 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
334
335 // Either write all of the data or none of it.
336 auto wrote = RINGBUFFER_TRY_WRITE_MESSAGE(
337 udp::udp_outbound,
338 to_host,
340 addr_family,
341 addr_data,
342 serializer::ByteRange{buf, len});
343
344 if (!wrote)
345 return -1;
346
347 return (int)len;
348 }
349
350 int handle_recv(uint8_t* buf, size_t len, sockaddr addr)
351 {
353 {
354 throw std::runtime_error("Called handle_recv from incorrect thread");
355 }
356
357 size_t len_read = 0;
358 for (auto& read : pending_reads)
359 {
360 // Only handle pending reads that belong to the same address
361 if (!memcmp((void*)&addr, (void*)&read.addr, sizeof(addr)))
362 continue;
363
364 size_t rd = std::min(len, read.len);
365 ::memcpy(buf, read.req, rd);
366 read.clear = true;
367
368 // UDP packets are datagrams, so it's either whole or nothing
369 len_read += rd;
370 if (len_read >= len)
371 break;
372 }
373
374 // Clear all marked for deletion
376
377 if (len_read > 0)
378 return len_read;
379 else
380 return -1;
381 }
382 };
383
384 // This is a wrapper for the QUICSession so we can use in rpc_sessions
385 // Ultimately, this needs to be an HTTP3ServerSession : HTTP3Session :
386 // QUICSession
388 {
389 std::shared_ptr<ccf::RPCMap> rpc_map;
390 std::shared_ptr<ccf::RpcHandler> handler;
391 std::shared_ptr<ccf::SessionContext> session_ctx;
392 int64_t session_id;
393 ccf::ListenInterfaceID interface_id;
394 sockaddr addr;
395
397 void echo()
398 {
400 flush();
401 }
402
403 public:
405 std::shared_ptr<ccf::RPCMap> rpc_map,
406 int64_t session_id,
407 const ccf::ListenInterfaceID& interface_id,
408 ringbuffer::AbstractWriterFactory& writer_factory) :
409 QUICSession(session_id, writer_factory),
410 rpc_map(rpc_map),
411 session_id(session_id),
412 interface_id(interface_id)
413 {}
414
415 void send_data(std::span<const uint8_t> data) override
416 {
417 send_raw(data.data(), data.size(), addr);
418 }
419
420 static void recv_cb(std::unique_ptr<threading::Tmsg<SendRecvMsg>> msg)
421 {
422 reinterpret_cast<QUICEchoSession*>(msg->data.self.get())
423 ->recv_(msg->data.data.data(), msg->data.data.size(), msg->data.addr);
424 }
425
426 void handle_incoming_data(std::span<const uint8_t> data) override
427 {
428 auto [_, addr_family, addr_data, body] =
429 ringbuffer::read_message<udp::udp_inbound>(data);
430
431 auto msg = std::make_unique<threading::Tmsg<SendRecvMsg>>(&recv_cb);
432 msg->data.self = this->shared_from_this();
433 msg->data.data.assign(body.data, body.data + body.size);
434 msg->data.addr = udp::sockaddr_decode(addr_family, addr_data);
435
437 execution_thread, std::move(msg));
438 }
439
440 void recv_(const uint8_t* data_, size_t size_, sockaddr addr_)
441 {
442 recv_buffered(data_, size_, addr_);
443 addr = addr_;
444
445 LOG_TRACE_FMT("recv called with {} bytes", size_);
446
447 // ECHO SERVER
448 echo();
449 }
450 };
451}
Definition session.h:11
Definition quic_session.h:388
void send_data(std::span< const uint8_t > data) override
Definition quic_session.h:415
static void recv_cb(std::unique_ptr< threading::Tmsg< SendRecvMsg > > msg)
Definition quic_session.h:420
void recv_(const uint8_t *data_, size_t size_, sockaddr addr_)
Definition quic_session.h:440
void handle_incoming_data(std::span< const uint8_t > data) override
Definition quic_session.h:426
QUICEchoSession(std::shared_ptr< ccf::RPCMap > rpc_map, int64_t session_id, const ccf::ListenInterfaceID &interface_id, ringbuffer::AbstractWriterFactory &writer_factory)
Definition quic_session.h:404
Definition quic_session.h:19
PendingList pending_reads
Definition quic_session.h:43
~QUICSession()
Definition quic_session.h:62
ringbuffer::WriterPtr to_host
Definition quic_session.h:21
QUICSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition quic_session.h:52
PendingList pending_writes
Definition quic_session.h:42
void send_raw_thread(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:179
void flush()
Definition quic_session.h:218
void close_session() override
Definition quic_session.h:263
std::vector< PendingBuffer > PendingList
Definition quic_session.h:41
Status get_status() const
Definition quic_session.h:34
static void send_raw_cb(std::unique_ptr< threading::Tmsg< SendRecvMsg > > msg)
Definition quic_session.h:163
Status
Definition quic_session.h:26
@ ready
Definition quic_session.h:28
@ handshake
Definition quic_session.h:27
@ authfail
Definition quic_session.h:30
@ closed
Definition quic_session.h:29
@ error
Definition quic_session.h:31
std::vector< uint8_t > peer_cert()
Definition quic_session.h:72
size_t execution_thread
Definition quic_session.h:23
static void close_cb(std::unique_ptr< threading::Tmsg< EmptyMsg > > msg)
Definition quic_session.h:258
std::string hostname()
Definition quic_session.h:67
void close_thread()
Definition quic_session.h:272
void send_buffered(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:207
size_t read(uint8_t *data, size_t size, sockaddr addr, bool exact=false)
Definition quic_session.h:82
void send_raw(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:168
ccf::tls::ConnID session_id
Definition quic_session.h:22
void recv_buffered(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:145
Definition ring_buffer_types.h:153
static ThreadMessaging & instance()
Definition thread_messaging.h:283
void add_task(uint16_t tid, std::unique_ptr< Tmsg< Payload > > msg)
Definition thread_messaging.h:318
uint16_t get_execution_thread(uint32_t i)
Definition thread_messaging.h:371
#define LOG_TRACE_FMT
Definition logger.h:356
uint16_t get_current_thread_id()
Definition thread_local.cpp:15
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition quic_session.h:16
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_TRY_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:258
Pending writes on both host and enclave, with data, length and destination address.
Definition pending_io.h:18
static void clear_empty(std::vector< PendingIO< T > > &list)
Clears a list of PendingIO<T> of all elements that were marked to remove (clear flag == true).
Definition pending_io.h:80
Definition quic_session.h:254
std::shared_ptr< QUICSession > self
Definition quic_session.h:255
Definition quic_session.h:157
sockaddr addr
Definition quic_session.h:160
std::shared_ptr< QUICSession > self
Definition quic_session.h:159
std::vector< uint8_t > data
Definition quic_session.h:158
Definition serializer.h:27
Definition thread_messaging.h:27