CCF
Loading...
Searching...
No Matches
quic_session.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ds/messaging.h"
7#include "ds/pending_io.h"
8#include "ds/ring_buffer.h"
9#include "enclave/session.h"
10#include "udp/msg_types.h"
11
12#include <exception>
13
14namespace quic
15{
16 class QUICSession : public ccf::Session,
17 public std::enable_shared_from_this<QUICSession>
18 {
19 protected:
22
23 std::shared_ptr<ccf::tasks::OrderedTasks> task_scheduler;
24
25 enum Status : std::uint8_t
26 {
31 error
32 };
33
35 {
36 return status;
37 }
38
40 using PendingList = std::vector<PendingBuffer>;
43
44 private:
45 // Decrypted data
46 std::vector<uint8_t> read_buffer;
47
48 Status status = handshake;
49
50 public:
52 int64_t session_id_, ringbuffer::AbstractWriterFactory& writer_factory_) :
53 to_host(writer_factory_.create_writer_to_outside()),
54 session_id(session_id_)
55 {
58 fmt::format("Session {}", session_id));
59 }
60
61 ~QUICSession() override
62 {
63 task_scheduler->cancel_task();
64 // RINGBUFFER_WRITE_MESSAGE(quic::quic_closed, to_host, session_id);
65 }
66
67 std::string hostname()
68 {
69 return {};
70 }
71
72 std::vector<uint8_t> peer_cert()
73 {
74 return {};
75 }
76
77 // Returns count N of bytes read, which will be the first N bytes of data,
78 // up to a maximum of size. If exact is true, will only return either size
79 // or 0 (when size bytes are not currently available). data may be accessed
80 // beyond N during operation, up to size, but only the first N should be
81 // used by caller.
82 size_t read(uint8_t* data, size_t size, sockaddr addr, bool exact = false)
83 {
84 LOG_TRACE_FMT("Requesting up to {} bytes", size);
85
86 // This will return empty if the connection isn't
87 // ready, but it will not block on the handshake.
88 do_handshake();
89
90 if (status != ready)
91 {
92 return 0;
93 }
94
95 // Send pending writes.
96 flush();
97
98 size_t offset = 0;
99
100 if (!read_buffer.empty())
101 {
103 "Have existing read_buffer of size: {}", read_buffer.size());
104 offset = std::min(size, read_buffer.size());
105 ::memcpy(data, read_buffer.data(), offset);
106
107 if (offset < read_buffer.size())
108 {
109 read_buffer.erase(read_buffer.begin(), read_buffer.begin() + offset);
110 }
111 else
112 {
113 read_buffer.clear();
114 }
115
116 if (offset == size)
117 {
118 return size;
119 }
120
121 // NB: If we continue past here, read_buffer is empty
122 }
123
124 // This will need to be handled by the actual QUIC stack
125 auto r = handle_recv(data + offset, size - offset, addr);
126 LOG_TRACE_FMT("quic read returned: {}", r);
127
128 if (r < 0)
129 {
130 LOG_TRACE_FMT("QUIC {} error on read", session_id);
131 stop(error);
132 return 0;
133 }
134
135 auto total = r + offset;
136
137 // We read _some_ data but not enough, and didn't get
138 // WANT_READ. Probably hit an internal size limit - try
139 // again
140 if (exact && (total < size))
141 {
143 "Asked for exactly {}, received {}, retrying", size, total);
144 read_buffer.insert(read_buffer.end(), data, data + total);
145 return read(data, size, addr, exact);
146 }
147
148 return total;
149 }
150
151 void recv_buffered(const uint8_t* data, size_t size, sockaddr addr)
152 {
153 LOG_TRACE_FMT("QUIC Session recv_buffered with {} bytes", size);
154 pending_reads.emplace_back(const_cast<uint8_t*>(data), size, addr);
155 do_handshake();
156 }
157
159 {
160 std::shared_ptr<QUICSession> self;
161 std::vector<uint8_t> data;
162 sockaddr addr{};
163
165 std::shared_ptr<QUICSession> s,
166 std::span<const uint8_t> d,
167 sockaddr sa) :
168 self(std::move(s)),
169 addr(sa)
170 {
171 data.assign(d.begin(), d.end());
172 }
173 };
174
176 {
178
179 void do_action() override
180 {
181 self->send_raw_thread(data, addr);
182 }
183
184 [[nodiscard]] const std::string& get_name() const override
185 {
186 static const std::string name = "quic::SendDataTask";
187 return name;
188 }
189 };
190
192 {
194
195 void do_action() override
196 {
197 self->recv(data.data(), data.size(), addr);
198 }
199
200 [[nodiscard]] const std::string& get_name() const override
201 {
202 static const std::string name = "quic::RecvDataTask";
203 return name;
204 }
205 };
206
207 void send_raw(const uint8_t* data, size_t size, sockaddr addr)
208 {
209 task_scheduler->add_action(std::make_shared<SendDataTask>(
210 shared_from_this(), std::span<const uint8_t>{data, size}, addr));
211 }
212
213 void send_raw_thread(const std::vector<uint8_t>& data, sockaddr addr)
214 {
215 // Writes as much of the data as possible. If the data cannot all
216 // be written now, we store the remainder. We
217 // will try to send pending writes again whenever write() is called.
218 do_handshake();
219
220 if (status == handshake)
221 {
222 pending_writes.emplace_back(
223 const_cast<uint8_t*>(data.data()), data.size(), addr);
224 return;
225 }
226
227 if (status != ready)
228 {
229 return;
230 }
231
232 pending_writes.emplace_back(
233 const_cast<uint8_t*>(data.data()), data.size(), addr);
234
235 flush();
236 }
237
238 void send_buffered(const std::vector<uint8_t>& data, sockaddr addr)
239 {
240 pending_writes.emplace_back(
241 const_cast<uint8_t*>(data.data()), data.size(), addr);
242 }
243
244 void handle_incoming_data(std::span<const uint8_t> data) override
245 {
246 auto [_, addr_family, addr_data, body] =
247 ringbuffer::read_message<udp::udp_inbound>(data);
248
249 task_scheduler->add_action(std::make_shared<RecvDataTask>(
250 shared_from_this(),
251 body,
252 udp::sockaddr_decode(addr_family, addr_data)));
253 }
254
255 virtual void recv(const uint8_t* data_, size_t size_, sockaddr addr_) = 0;
256
257 void flush()
258 {
259 do_handshake();
260
261 if (status != ready)
262 {
263 return;
264 }
265
266 for (auto& write : pending_writes)
267 {
268 LOG_TRACE_FMT("QUIC write_some {} bytes", write.len);
269
270 // This will need to be handled by the actual QUIC stack
271 int rc = handle_send(write.req, write.len, write.addr);
272 if (rc < 0)
273 {
274 LOG_TRACE_FMT("QUIC {} error on flush", session_id);
275 stop(error);
276 return;
277 }
278
279 // Mark for deletion (avoiding invalidating iterator)
280 write.clear = true;
281 }
282
283 // Clear all marked for deletion
285 }
286
287 void close_session() override
288 {
289 auto self = shared_from_this();
290 task_scheduler->add_action(
291 ccf::tasks::make_basic_action([self]() { self->close_thread(); }));
292 }
293
295 {
296 switch (status)
297 {
298 case handshake:
299 {
300 LOG_TRACE_FMT("QUIC {} closed during handshake", session_id);
301 stop(closed);
302 break;
303 }
304
305 case ready:
306 {
307 LOG_TRACE_FMT("QUIC {} closed", session_id);
308 stop(closed);
309 break;
310 }
311
312 case closed:
313 case authfail:
314 case error:
315 default:
316 {
317 }
318 }
319 }
320
321 private:
322 void do_handshake()
323 {
324 // This should be called when additional data is written to the
325 // input buffer, until the handshake is complete.
326 if (status != handshake)
327 {
328 return;
329 }
330
331 // This will need to be handled by the actual QUIC stack
332 LOG_TRACE_FMT("QUIC do_handshake unimplemented");
333 status = ready;
334 }
335
336 void stop(Status status_)
337 {
338 switch (status)
339 {
340 case closed:
341 case authfail:
342 case error:
343 return;
344
345 case handshake:
346 case ready:
347 default:
348 {
349 }
350 }
351
352 status = status_;
353 }
354
355 int handle_send(const uint8_t* buf, size_t len, sockaddr addr)
356 {
357 auto [addr_family, addr_data] = udp::sockaddr_encode(addr);
358
359 // Either write all of the data or none of it.
360 auto wrote = RINGBUFFER_TRY_WRITE_MESSAGE(
361 udp::udp_outbound,
362 to_host,
364 addr_family,
365 addr_data,
366 serializer::ByteRange{buf, len});
367
368 if (!wrote)
369 {
370 return -1;
371 }
372
373 return (int)len;
374 }
375
376 int handle_recv(uint8_t* buf, size_t len, sockaddr addr)
377 {
378 size_t len_read = 0;
379 for (auto& read : pending_reads)
380 {
381 // Only handle pending reads that belong to the same address
382 if (memcmp((void*)&addr, (void*)&read.addr, sizeof(addr)) != 0)
383 {
384 continue;
385 }
386
387 size_t rd = std::min(len, read.len);
388 ::memcpy(buf, read.req, rd);
389 read.clear = true;
390
391 // UDP packets are datagrams, so it's either whole or nothing
392 len_read += rd;
393 if (len_read >= len)
394 {
395 break;
396 }
397 }
398
399 // Clear all marked for deletion
401
402 if (len_read > 0)
403 {
404 return len_read;
405 }
406 return -1;
407 }
408 };
409
410 // This is a wrapper for the QUICSession so we can use in rpc_sessions
411 // Ultimately, this needs to be an HTTP3ServerSession : HTTP3Session :
412 // QUICSession
414 {
415 std::shared_ptr<ccf::RPCMap> rpc_map;
416 std::shared_ptr<ccf::RpcHandler> handler;
417 std::shared_ptr<ccf::SessionContext> session_ctx;
418 int64_t session_id;
419 ccf::ListenInterfaceID interface_id;
420 sockaddr addr;
421
423 void echo()
424 {
426 flush();
427 }
428
429 public:
431 std::shared_ptr<ccf::RPCMap> rpc_map_,
432 int64_t session_id_,
433 ccf::ListenInterfaceID interface_id_,
434 ringbuffer::AbstractWriterFactory& writer_factory) :
435 QUICSession(session_id_, writer_factory),
436 rpc_map(std::move(rpc_map_)),
437 session_id(session_id_),
438 interface_id(std::move(interface_id_)),
439 addr{}
440 {}
441
442 void send_data(std::vector<uint8_t>&& data) override
443 {
444 send_raw(data.data(), data.size(), addr);
445 }
446
447 void recv(const uint8_t* data_, size_t size_, sockaddr addr_) override
448 {
449 recv_buffered(data_, size_, addr_);
450 addr = addr_;
451
452 LOG_TRACE_FMT("recv called with {} bytes", size_);
453
454 // ECHO SERVER
455 echo();
456 }
457 };
458}
Definition session.h:11
static std::shared_ptr< OrderedTasks > create(JobBoard &job_board_, const std::string &name_="[Ordered]")
Definition ordered_tasks.cpp:117
Definition quic_session.h:414
void recv(const uint8_t *data_, size_t size_, sockaddr addr_) override
Definition quic_session.h:447
QUICEchoSession(std::shared_ptr< ccf::RPCMap > rpc_map_, int64_t session_id_, ccf::ListenInterfaceID interface_id_, ringbuffer::AbstractWriterFactory &writer_factory)
Definition quic_session.h:430
void send_data(std::vector< uint8_t > &&data) override
Definition quic_session.h:442
Definition quic_session.h:18
PendingList pending_reads
Definition quic_session.h:42
ringbuffer::WriterPtr to_host
Definition quic_session.h:20
QUICSession(int64_t session_id_, ringbuffer::AbstractWriterFactory &writer_factory_)
Definition quic_session.h:51
PendingList pending_writes
Definition quic_session.h:41
std::shared_ptr< ccf::tasks::OrderedTasks > task_scheduler
Definition quic_session.h:23
void send_raw_thread(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:213
void flush()
Definition quic_session.h:257
void close_session() override
Definition quic_session.h:287
void handle_incoming_data(std::span< const uint8_t > data) override
Definition quic_session.h:244
std::vector< PendingBuffer > PendingList
Definition quic_session.h:40
Status get_status() const
Definition quic_session.h:34
virtual void recv(const uint8_t *data_, size_t size_, sockaddr addr_)=0
std::vector< uint8_t > peer_cert()
Definition quic_session.h:72
Status
Definition quic_session.h:26
@ ready
Definition quic_session.h:28
@ handshake
Definition quic_session.h:27
@ authfail
Definition quic_session.h:30
@ closed
Definition quic_session.h:29
@ error
Definition quic_session.h:31
~QUICSession() override
Definition quic_session.h:61
std::string hostname()
Definition quic_session.h:67
void close_thread()
Definition quic_session.h:294
void send_buffered(const std::vector< uint8_t > &data, sockaddr addr)
Definition quic_session.h:238
size_t read(uint8_t *data, size_t size, sockaddr addr, bool exact=false)
Definition quic_session.h:82
void send_raw(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:207
ccf::tls::ConnID session_id
Definition quic_session.h:21
void recv_buffered(const uint8_t *data, size_t size, sockaddr addr)
Definition quic_session.h:151
Definition ring_buffer_types.h:157
#define LOG_TRACE_FMT
Definition internal_logger.h:13
TaskAction make_basic_action(Ts &&... ts)
Definition ordered_tasks.h:47
JobBoard & get_main_job_board()
Definition task_system.cpp:53
int64_t ConnID
Definition custom_protocol_subsystem_interface.h:20
std::string ListenInterfaceID
Definition rpc_context.h:21
Definition quic_session.h:15
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
#define RINGBUFFER_TRY_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:262
Pending writes on both host and enclave, with data, length and destination address.
Definition pending_io.h:18
static void clear_empty(std::vector< PendingIO< T > > &list)
Clears a list of PendingIO<T> of all elements that were marked to remove (clear flag == true).
Definition pending_io.h:79
Definition ordered_tasks.h:13
Definition quic_session.h:192
const std::string & get_name() const override
Definition quic_session.h:200
void do_action() override
Definition quic_session.h:195
Definition quic_session.h:176
void do_action() override
Definition quic_session.h:179
const std::string & get_name() const override
Definition quic_session.h:184
Definition quic_session.h:159
SessionDataTask(std::shared_ptr< QUICSession > s, std::span< const uint8_t > d, sockaddr sa)
Definition quic_session.h:164
sockaddr addr
Definition quic_session.h:162
std::shared_ptr< QUICSession > self
Definition quic_session.h:160
std::vector< uint8_t > data
Definition quic_session.h:161
Definition serializer.h:27