CCF
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/mem.h"
6#include "ring_buffer_types.h"
7
8#include <atomic>
9#include <cstring>
10#include <functional>
11
12// Ideally this would be _mm_pause or similar, but finding cross-platform
13// headers that expose this neatly through OE (ie - non-standard std libs) is
14// awkward. Instead we resort to copying OE, and implementing this directly
15// ourselves.
16#define CCF_PAUSE() asm volatile("pause")
17
18// This file implements a Multiple-Producer Single-Consumer ringbuffer.
19
20// A single Reader instance owns an underlying memory buffer, and a single
21// thread should process message written to it. Any number of other threads and
22// Writers may write to it, and the messages will be distinct, correct, and
23// ordered.
24
25// A Circuit wraps a pair of ringbuffers to allow 2-way communication - messages
26// are written to the inbound buffer, processed inside an enclave, and responses
27// written back to the outbound.
28
29namespace ringbuffer
30{
31 using Handler = std::function<void(Message, const uint8_t*, size_t)>;
32
33 // High bit of message size is used to indicate a pending message
34 static constexpr uint32_t pending_write_flag = 1 << 31;
35 static constexpr uint32_t length_mask = ~pending_write_flag;
36
37 struct Const
38 {
39 enum : Message
40 {
41 msg_max = std::numeric_limits<Message>::max() - 1,
44 msg_pad = std::numeric_limits<Message>::max()
45 };
46
47 static constexpr bool is_power_of_2(size_t n)
48 {
49 return n && ((n & (~n + 1)) == n);
50 }
51
52 static bool is_aligned(uint8_t const* data, size_t align)
53 {
54 return reinterpret_cast<std::uintptr_t>(data) % align == 0;
55 }
56
57 static constexpr size_t header_size()
58 {
59 // The header is a 32 bit length and a 32 bit message ID.
60 return sizeof(int32_t) + sizeof(uint32_t);
61 }
62
63 static constexpr size_t align_size(size_t n)
64 {
65 // Make sure the header is aligned in memory.
66 return (n + (header_size() - 1)) & ~(header_size() - 1);
67 }
68
69 static constexpr size_t entry_size(size_t n)
70 {
71 return Const::align_size(n + header_size());
72 }
73
74 static constexpr size_t max_size()
75 {
76 // The length of a message plus its header must be encodable in the
77 // header. High bit of lengths indicate pending writes.
78 return std::numeric_limits<int32_t>::max() - header_size();
79 }
80
81 static constexpr size_t max_reservation_size(size_t buffer_size)
82 {
83 // This guarantees that in an empty buffer, we can always make this
84 // reservation in a single contiguous region (either before or after the
85 // current tail). If we allow larger reservations then we may need to
86 // artificially advance the tail (writing padding then clearing it) to
87 // create a sufficiently large region.
88 return buffer_size / 2;
89 }
90
91 static constexpr size_t previous_power_of_2(size_t n)
92 {
93 const auto lz = __builtin_clzll(n);
94 return 1ul << (sizeof(size_t) * 8 - 1 - lz);
95 }
96
97 static bool find_acceptable_sub_buffer(uint8_t*& data_, size_t& size_)
98 {
99 void* data = reinterpret_cast<void*>(data_);
100 size_t size = size_;
101
102 auto ret = std::align(8, sizeof(size_t), data, size);
103 if (ret == nullptr)
104 {
105 return false;
106 }
107
108 data_ = reinterpret_cast<uint8_t*>(data);
109 size_ = previous_power_of_2(size);
110 return true;
111 }
112
113 static uint64_t make_header(Message m, size_t size, bool pending = true)
114 {
115 return (((uint64_t)m) << 32) |
116 ((size & length_mask) | (pending ? pending_write_flag : 0u));
117 }
118 };
119
121 {
122 uint8_t* data;
123 size_t size;
124
126
127 void check_access(size_t index, size_t access_size)
128 {
129 if (index + access_size > size)
130 {
131#ifdef RINGBUFFER_USE_ABORT
132 abort();
133#else
134 throw std::runtime_error(fmt::format(
135 "Ringbuffer access out of bounds - attempting to access {}, max "
136 "index is {}",
137 index + access_size,
138 size));
139#endif
140 }
141 }
142 };
143
144 namespace
145 {
146 static inline uint64_t read64_impl(const BufferDef& bd, size_t index)
147 {
148#ifdef __cpp_lib_atomic_ref
149 auto& ref = *(reinterpret_cast<uint64_t*>(bd.data + index));
150 std::atomic_ref<uint64_t> slot(ref);
151 return slot.load(std::memory_order_acquire);
152#else
153 // __atomic_load is used instead of std::atomic_ref since it's not
154 // supported by libc++ yet.
155 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
156 uint64_t r = 0;
157 __atomic_load(
158 reinterpret_cast<uint64_t*>(bd.data + index), &r, __ATOMIC_ACQUIRE);
159 return r;
160#endif
161 }
162
163 static inline Message message(uint64_t header)
164 {
165 return (Message)(header >> 32);
166 }
167
168 static inline uint32_t length(uint64_t header)
169 {
170 return header & std::numeric_limits<uint32_t>::max();
171 }
172 }
173
174 class Reader
175 {
176 friend class Writer;
177
178 BufferDef bd;
179
180 std::vector<uint8_t> local_copy;
181
182 virtual uint64_t read64(size_t index)
183 {
184 bd.check_access(index, sizeof(uint64_t));
185 return read64_impl(bd, index);
186 }
187
188 virtual void clear_mem(size_t index, size_t advance)
189 {
190 ::memset(bd.data + index, 0, advance);
191 }
192
193 public:
194 Reader(const BufferDef& bd_) : bd(bd_)
195 {
196 if (!Const::is_power_of_2(bd.size))
197 {
198 throw std::logic_error(
199 fmt::format("Buffer size must be a power of 2, not {}", bd.size));
200 }
201
202 if (!Const::is_aligned(bd.data, 8))
203 {
204 throw std::logic_error("Buffer must be 8-byte aligned");
205 }
206 }
207
208 size_t read(size_t limit, Handler f)
209 {
210 auto mask = bd.size - 1;
211 auto hd = bd.offsets->head.load(std::memory_order_acquire);
212 auto hd_index = hd & mask;
213 auto block = bd.size - hd_index;
214 size_t advance = 0;
215 size_t count = 0;
216
217 while ((advance < block) && (count < limit))
218 {
219 auto msg_index = hd_index + advance;
220 auto header = read64(msg_index);
221 auto size = length(header);
222
223 // If we see a pending write, we're done.
224 if ((size & pending_write_flag) != 0u)
225 break;
226
227 auto m = message(header);
228
229 if (m == Const::msg_none)
230 {
231 // There is no message here, we're done.
232 break;
233 }
234 else if (m == Const::msg_pad)
235 {
236 // If we see padding, skip it.
237 // NB: Padding messages are potentially unaligned, where other
238 // messages are aligned by calls to entry_size(). Even for an empty
239 // padding message (size == 0), we need to skip past the message
240 // header.
241 advance += Const::header_size() + size;
242 continue;
243 }
244
245 advance += Const::entry_size(size);
246 ++count;
247
248 // Call the handler function for this message.
249 bd.check_access(hd_index, advance);
250
251 if (ccf::pal::require_alignment_for_untrusted_reads() && size > 0)
252 {
253 // To prevent unaligned reads during message processing, copy aligned
254 // chunk into enclave memory
255 const auto copy_size = Const::align_size(size);
256 if (local_copy.size() < copy_size)
257 {
258 local_copy.resize(copy_size);
259 }
260 ccf::pal::safe_memcpy(
261 local_copy.data(),
262 bd.data + msg_index + Const::header_size(),
263 copy_size);
264 f(m, local_copy.data(), (size_t)size);
265 }
266 else
267 {
268 f(m, bd.data + msg_index + Const::header_size(), (size_t)size);
269 }
270 }
271
272 if (advance > 0)
273 {
274 // Zero the buffer and advance the head.
275 bd.check_access(hd_index, advance);
276 clear_mem(hd_index, advance);
277 bd.offsets->head.store(hd + advance, std::memory_order_release);
278 }
279
280 return count;
281 }
282 };
283
284 class Writer : public AbstractWriter
285 {
286 protected:
287 BufferDef bd; // copy of reader's buffer definition
288 const size_t rmax;
289
291 {
292 // Index within buffer of reservation start
293 size_t index;
294
295 // Individual identifier for this reservation. Should be unique across
296 // buffer lifetime, amongst all writers
298 };
299
300 public:
301 Writer(const Reader& r) :
302 bd(r.bd),
303 rmax(Const::max_reservation_size(bd.size))
304 {}
305
306 Writer(const Writer& that) : bd(that.bd), rmax(that.rmax) {}
307
308 virtual ~Writer() {}
309
310 virtual std::optional<size_t> prepare(
311 Message m,
312 size_t size,
313 bool wait = true,
314 size_t* identifier = nullptr) override
315 {
316 // Make sure we aren't using a reserved message.
317 if ((m < Const::msg_min) || (m > Const::msg_max))
318 {
319 throw message_error(
320 m, fmt::format("Cannot use a reserved message ({})", m));
321 }
322
323 // Make sure the message fits.
324 if (size > Const::max_size())
325 {
326 throw message_error(
327 m,
328 fmt::format(
329 "Message ({}) is too long for any writer: {} > {}",
330 m,
331 size,
332 Const::max_size()));
333 }
334
335 auto rsize = Const::entry_size(size);
336 if (rsize > rmax)
337 {
338 throw message_error(
339 m,
340 fmt::format(
341 "Message ({}) is too long for this writer: {} > {}",
342 m,
343 rsize,
344 rmax));
345 }
346
347 auto r = reserve(rsize);
348
349 if (!r.has_value())
350 {
351 if (wait)
352 {
353 // Retry until there is sufficient space.
354 do
355 {
356 CCF_PAUSE();
357 r = reserve(rsize);
358 } while (!r.has_value());
359 }
360 else
361 {
362 // Fail if there is insufficient space.
363 return {};
364 }
365 }
366
367 // Write the preliminary header and return the buffer pointer.
368 // The initial header length has high bit set to indicate a pending
369 // message. We rewrite the real length after the message data.
370 write64(r.value().index, Const::make_header(m, size));
371
372 if (identifier != nullptr)
373 *identifier = r.value().identifier;
374
375 return {r.value().index + Const::header_size()};
376 }
377
378 virtual void finish(const WriteMarker& marker) override
379 {
380 if (marker.has_value())
381 {
382 // Fix up the size to indicate we're done writing - unset pending bit.
383 const auto index = marker.value() - Const::header_size();
384 const auto header = read64(index);
385 const auto size = length(header);
386 const auto m = message(header);
387 const auto finished_header = Const::make_header(m, size, false);
388 write64(index, finished_header);
389 }
390 }
391
392 virtual size_t get_max_message_size() override
393 {
394 return Const::max_size();
395 }
396
397 protected:
399 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
400 {
401 if (!marker.has_value())
402 {
403 return {};
404 }
405
406 const auto index = marker.value();
407
408 bd.check_access(index, size);
409
410 // Standard says memcpy(x, null, 0) is undefined, so avoid it
411 if (size > 0)
412 {
413 ccf::pal::safe_memcpy(bd.data + index, bytes, size);
414 }
415
416 return {index + size};
417 }
418
419 private:
420 // We use this to detect whether the head is ahead of the tail. In real
421 // operation they should be close to each, relative to the total range of a
422 // uint64_t. To handle wrap-around (ie - when a write has overflowed past
423 // the max value), we consider it larger if the distance between a and b is
424 // less than half the total range (and positive).
425 static bool greater_with_wraparound(size_t a, size_t b)
426 {
427 static constexpr auto switch_point = UINT64_MAX / 2;
428
429 return (a != b) && ((a - b) < switch_point);
430 }
431
432 virtual uint64_t read64(size_t index)
433 {
434 bd.check_access(index, sizeof(uint64_t));
435 return read64_impl(bd, index);
436 }
437
438 virtual void write64(size_t index, uint64_t value)
439 {
440 bd.check_access(index, sizeof(value));
441#ifdef __cpp_lib_atomic_ref
442 auto& ref = *(reinterpret_cast<uint64_t*>(bd.data + index));
443 std::atomic_ref<uint64_t> slot(ref);
444 slot.store(value, std::memory_order_release);
445#else
446 // __atomic_store is used instead of std::atomic_ref since it's not
447 // supported by libc++ yet.
448 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
449 __atomic_store(
450 reinterpret_cast<uint64_t*>(bd.data + index), &value, __ATOMIC_RELEASE);
451#endif
452 }
453
454 std::optional<Reservation> reserve(size_t size)
455 {
456 auto mask = bd.size - 1;
457 auto hd = bd.offsets->head_cache.load(std::memory_order_acquire);
458 auto tl = bd.offsets->tail.load(std::memory_order_relaxed);
459
460 // NB: These will be always be set on the first loop, before they are
461 // read, so this initialisation is unnecessary. It is added to placate
462 // static analyzers.
463 size_t padding = 0u;
464 size_t tl_index = 0u;
465
466 do
467 {
468 auto gap = tl - hd;
469 auto avail = bd.size - gap;
470
471 // If the head cache is too far behind the tail, or if the message does
472 // not fit in the available space, get an accurate head and try again.
473 if ((gap > bd.size) || (size > avail))
474 {
475 // If the message does not fit in the sum of front-space and
476 // back-space, see if head has moved to give us enough space.
477 hd = bd.offsets->head.load(std::memory_order_acquire);
478
479 // This happens if the head has passed the tail we previously loaded.
480 // It is safe to continue here, as the compare_exchange_weak is
481 // guaranteed to fail and update tl.
482 if (greater_with_wraparound(hd, tl))
483 {
484 continue;
485 }
486
487 avail = bd.size - (tl - hd);
488
489 // If it still doesn't fit, fail.
490 if (size > avail)
491 return {};
492
493 // This may move the head cache backwards, but if so, that is safe and
494 // will be corrected later.
495 bd.offsets->head_cache.store(hd, std::memory_order_release);
496 }
497
498 padding = 0;
499 tl_index = tl & mask;
500 auto block = bd.size - tl_index;
501
502 if (size > block)
503 {
504 // If the message doesn't fit in back-space...
505 auto hd_index = hd & mask;
506
507 if (size > hd_index)
508 {
509 // If message doesn't fit in front-space, see if the head has moved
510 hd = bd.offsets->head.load(std::memory_order_acquire);
511 hd_index = hd & mask;
512
513 // If it still doesn't fit, fail - there is not a contiguous region
514 // large enough for this reservation
515 if (size > hd_index)
516 return {};
517
518 // This may move the head cache backwards, but if so, that is safe
519 // and will be corrected later.
520 bd.offsets->head_cache.store(hd, std::memory_order_release);
521 }
522
523 // Pad the back-space and reserve front-space for our message in a
524 // single tail update.
525 padding = block;
526 }
527 } while (!bd.offsets->tail.compare_exchange_weak(
528 tl, tl + size + padding, std::memory_order_seq_cst));
529
530 if (padding != 0)
531 {
532 write64(
533 tl_index,
535 Const::msg_pad, padding - Const::header_size(), false));
536 tl_index = 0;
537 }
538
539 return {{tl_index, tl}};
540 }
541 };
542
543 // This is entirely non-virtual so can be safely passed to the enclave
545 {
546 private:
547 ringbuffer::Reader from_outside;
548 ringbuffer::Reader from_inside;
549
550 public:
552 const BufferDef& from_outside_buffer,
553 const BufferDef& from_inside_buffer) :
554 from_outside(from_outside_buffer),
555 from_inside(from_inside_buffer)
556 {}
557
559 {
560 return from_outside;
561 }
562
564 {
565 return from_inside;
566 }
567
569 {
570 return ringbuffer::Writer(from_inside);
571 }
572
574 {
575 return ringbuffer::Writer(from_outside);
576 }
577 };
578
580 {
581 ringbuffer::Circuit& raw_circuit;
582
583 public:
584 WriterFactory(ringbuffer::Circuit& c) : raw_circuit(c) {}
585
586 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
587 override
588 {
589 return std::make_shared<Writer>(raw_circuit.read_from_inside());
590 }
591
592 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
593 override
594 {
595 return std::make_shared<Writer>(raw_circuit.read_from_outside());
596 }
597 };
598
599 // This struct wraps buffer management to simplify testing
601 {
602 std::vector<uint8_t> storage;
604
606
607 TestBuffer(size_t size) : storage(size, 0), offsets()
608 {
609 bd.data = storage.data();
610 bd.size = storage.size();
611 bd.offsets = &offsets;
612 }
613 };
614}
Definition ring_buffer_types.h:153
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition ring_buffer.h:545
ringbuffer::Writer write_to_inside()
Definition ring_buffer.h:573
ringbuffer::Reader & read_from_inside()
Definition ring_buffer.h:563
Circuit(const BufferDef &from_outside_buffer, const BufferDef &from_inside_buffer)
Definition ring_buffer.h:551
ringbuffer::Reader & read_from_outside()
Definition ring_buffer.h:558
ringbuffer::Writer write_to_outside()
Definition ring_buffer.h:568
Definition ring_buffer.h:175
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:208
Reader(const BufferDef &bd_)
Definition ring_buffer.h:194
Definition ring_buffer.h:580
WriterFactory(ringbuffer::Circuit &c)
Definition ring_buffer.h:584
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition ring_buffer.h:592
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition ring_buffer.h:586
Definition ring_buffer.h:285
virtual std::optional< size_t > prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr) override
Definition ring_buffer.h:310
Writer(const Reader &r)
Definition ring_buffer.h:301
virtual void finish(const WriteMarker &marker) override
Definition ring_buffer.h:378
BufferDef bd
Definition ring_buffer.h:287
virtual size_t get_max_message_size() override
Definition ring_buffer.h:392
const size_t rmax
Definition ring_buffer.h:288
Writer(const Writer &that)
Definition ring_buffer.h:306
virtual ~Writer()
Definition ring_buffer.h:308
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition ring_buffer.h:398
Definition ring_buffer_types.h:49
Definition non_blocking.h:14
std::function< void(Message, const uint8_t *, size_t)> Handler
Definition ring_buffer.h:31
uint32_t Message
Definition ring_buffer_types.h:19
#define CCF_PAUSE()
Definition ring_buffer.h:16
Definition ring_buffer.h:121
void check_access(size_t index, size_t access_size)
Definition ring_buffer.h:127
uint8_t * data
Definition ring_buffer.h:122
Offsets * offsets
Definition ring_buffer.h:125
size_t size
Definition ring_buffer.h:123
Definition ring_buffer.h:38
@ msg_pad
Definition ring_buffer.h:44
@ msg_max
Definition ring_buffer.h:41
@ msg_none
Definition ring_buffer.h:43
@ msg_min
Definition ring_buffer.h:42
static constexpr size_t max_size()
Definition ring_buffer.h:74
static constexpr bool is_power_of_2(size_t n)
Definition ring_buffer.h:47
static constexpr size_t align_size(size_t n)
Definition ring_buffer.h:63
static constexpr size_t previous_power_of_2(size_t n)
Definition ring_buffer.h:91
static bool find_acceptable_sub_buffer(uint8_t *&data_, size_t &size_)
Definition ring_buffer.h:97
static bool is_aligned(uint8_t const *data, size_t align)
Definition ring_buffer.h:52
static constexpr size_t entry_size(size_t n)
Definition ring_buffer.h:69
static constexpr size_t max_reservation_size(size_t buffer_size)
Definition ring_buffer.h:81
static constexpr size_t header_size()
Definition ring_buffer.h:57
static uint64_t make_header(Message m, size_t size, bool pending=true)
Definition ring_buffer.h:113
Definition ring_buffer_types.h:25
std::atomic< size_t > head
Definition ring_buffer_types.h:45
std::atomic< size_t > head_cache
Definition ring_buffer_types.h:31
std::atomic< size_t > tail
Definition ring_buffer_types.h:39
Definition ring_buffer.h:601
std::vector< uint8_t > storage
Definition ring_buffer.h:602
BufferDef bd
Definition ring_buffer.h:605
Offsets offsets
Definition ring_buffer.h:603
TestBuffer(size_t size)
Definition ring_buffer.h:607
Definition ring_buffer.h:291
size_t index
Definition ring_buffer.h:293
size_t identifier
Definition ring_buffer.h:297