CCF
Loading...
Searching...
No Matches
ring_buffer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/pal/mem.h"
6#include "ring_buffer_types.h"
7
8#include <atomic>
9#include <cstring>
10#include <functional>
11#include <thread>
12
13// This file implements a Multiple-Producer Single-Consumer ringbuffer.
14
15// A single Reader instance owns an underlying memory buffer, and a single
16// thread should process message written to it. Any number of other threads and
17// Writers may write to it, and the messages will be distinct, correct, and
18// ordered.
19
20// A Circuit wraps a pair of ringbuffers to allow 2-way communication - messages
21// are written to the inbound buffer, processed inside an enclave, and responses
22// written back to the outbound.
23
24namespace ringbuffer
25{
26 using Handler = std::function<void(Message, const uint8_t*, size_t)>;
27
28 // High bit of message size is used to indicate a pending message
29 static constexpr uint32_t pending_write_flag = 1 << 31;
30 static constexpr uint32_t length_mask = ~pending_write_flag;
31
32 struct Const
33 {
34 enum : Message
35 {
36 msg_max = std::numeric_limits<Message>::max() - 1,
39 msg_pad = std::numeric_limits<Message>::max()
40 };
41
42 static constexpr bool is_power_of_2(size_t n)
43 {
44 return (n != 0u) && ((n & (~n + 1)) == n);
45 }
46
47 static bool is_aligned(uint8_t const* data, size_t align)
48 {
49 return reinterpret_cast<std::uintptr_t>(data) % align == 0;
50 }
51
52 static constexpr size_t header_size()
53 {
54 // The header is a 32 bit length and a 32 bit message ID.
55 return sizeof(int32_t) + sizeof(uint32_t);
56 }
57
58 static constexpr size_t align_size(size_t n)
59 {
60 // Make sure the header is aligned in memory.
61 return (n + (header_size() - 1)) & ~(header_size() - 1);
62 }
63
64 static constexpr size_t entry_size(size_t n)
65 {
66 return Const::align_size(n + header_size());
67 }
68
69 static constexpr size_t max_size()
70 {
71 // The length of a message plus its header must be encodable in the
72 // header. High bit of lengths indicate pending writes.
73 return std::numeric_limits<int32_t>::max() - header_size();
74 }
75
76 static constexpr size_t max_reservation_size(size_t buffer_size)
77 {
78 // This guarantees that in an empty buffer, we can always make this
79 // reservation in a single contiguous region (either before or after the
80 // current tail). If we allow larger reservations then we may need to
81 // artificially advance the tail (writing padding then clearing it) to
82 // create a sufficiently large region.
83 return buffer_size / 2;
84 }
85
86 static constexpr size_t previous_power_of_2(size_t n)
87 {
88 const auto lz = __builtin_clzll(n);
89 return 1ul << (sizeof(size_t) * 8 - 1 - lz);
90 }
91
92 static bool find_acceptable_sub_buffer(uint8_t*& data_, size_t& size_)
93 {
94 void* data = reinterpret_cast<void*>(data_);
95 size_t size = size_;
96
97 auto* ret = std::align(8, sizeof(size_t), data, size);
98 if (ret == nullptr)
99 {
100 return false;
101 }
102
103 data_ = reinterpret_cast<uint8_t*>(data);
104 size_ = previous_power_of_2(size);
105 return true;
106 }
107
108 static uint64_t make_header(Message m, size_t size, bool pending = true)
109 {
110 return (((uint64_t)m) << 32) |
111 ((size & length_mask) | (pending ? pending_write_flag : 0u));
112 }
113 };
114
116 {
117 uint8_t* data;
118 size_t size;
119
121
122 void check_access(size_t index, size_t access_size)
123 {
124 if (index + access_size > size)
125 {
126#ifdef RINGBUFFER_USE_ABORT
127 abort();
128#else
129 throw std::runtime_error(fmt::format(
130 "Ringbuffer access out of bounds - attempting to access {}, max "
131 "index is {}",
132 index + access_size,
133 size));
134#endif
135 }
136 }
137 };
138
139 namespace detail
140 {
141 inline uint64_t read64_impl(const BufferDef& bd, size_t index)
142 {
143 auto* src = bd.data + index;
144 auto* src_64 = reinterpret_cast<uint64_t*>(src);
145
146 if (Const::is_aligned(src, 8))
147 {
148 auto& ref = *src_64;
149 std::atomic_ref<uint64_t> slot(ref);
150 return slot.load(std::memory_order_acquire);
151 }
152
153 // __atomic_load is used when the src pointer is not aligned, since
154 // std::atomic_ref requires proper alignment.
155 uint64_t r = 0;
156 __atomic_load(src_64, &r, __ATOMIC_ACQUIRE);
157 return r;
158 }
159
160 inline Message message(uint64_t header)
161 {
162 return (Message)(header >> 32);
163 }
164
165 inline uint32_t length(uint64_t header)
166 {
167 return header & std::numeric_limits<uint32_t>::max();
168 }
169 } // namespace detail
170
171 class Reader
172 {
173 friend class Writer;
174
175 BufferDef bd;
176
177 virtual uint64_t read64(size_t index)
178 {
179 bd.check_access(index, sizeof(uint64_t));
180 return detail::read64_impl(bd, index);
181 }
182
183 virtual void clear_mem(size_t index, size_t advance)
184 {
185 ::memset(bd.data + index, 0, advance);
186 }
187
188 public:
189 Reader(const BufferDef& bd_) : bd(bd_)
190 {
191 if (!Const::is_power_of_2(bd.size))
192 {
193 throw std::logic_error(
194 fmt::format("Buffer size must be a power of 2, not {}", bd.size));
195 }
196
197 if (!Const::is_aligned(bd.data, 8))
198 {
199 throw std::logic_error("Buffer must be 8-byte aligned");
200 }
201 }
202
203 virtual ~Reader() = default;
204
205 size_t read(size_t limit, Handler f)
206 {
207 auto mask = bd.size - 1;
208 auto hd = bd.offsets->head.load(std::memory_order_acquire);
209 auto hd_index = hd & mask;
210 auto block = bd.size - hd_index;
211 size_t advance = 0;
212 size_t count = 0;
213
214 while ((advance < block) && (count < limit))
215 {
216 auto msg_index = hd_index + advance;
217 auto header = read64(msg_index);
218 auto size = detail::length(header);
219
220 // If we see a pending write, we're done.
221 if ((size & pending_write_flag) != 0u)
222 {
223 break;
224 }
225
226 auto m = detail::message(header);
227
228 if (m == Const::msg_none)
229 {
230 // There is no message here, we're done.
231 break;
232 }
233
234 if (m == Const::msg_pad)
235 {
236 // If we see padding, skip it.
237 // NB: Padding messages are potentially unaligned, where other
238 // messages are aligned by calls to entry_size(). Even for an empty
239 // padding message (size == 0), we need to skip past the message
240 // header.
241 advance += Const::header_size() + size;
242 continue;
243 }
244
245 advance += Const::entry_size(size);
246 ++count;
247
248 // Call the handler function for this message.
249 bd.check_access(hd_index, advance);
250
251 f(m, bd.data + msg_index + Const::header_size(), (size_t)size);
252 }
253
254 if (advance > 0)
255 {
256 // Zero the buffer and advance the head.
257 bd.check_access(hd_index, advance);
258 clear_mem(hd_index, advance);
259 bd.offsets->head.store(hd + advance, std::memory_order_release);
260 }
261
262 return count;
263 }
264 };
265
266 class Writer : public AbstractWriter
267 {
268 protected:
269 BufferDef bd; // copy of reader's buffer definition
270 const size_t rmax;
271
273 {
274 // Index within buffer of reservation start
275 size_t index;
276
277 // Individual identifier for this reservation. Should be unique across
278 // buffer lifetime, amongst all writers
280 };
281
282 public:
283 Writer(const Reader& r) :
284 bd(r.bd),
285 rmax(Const::max_reservation_size(bd.size))
286 {}
287
288 Writer(const Writer& that) : bd(that.bd), rmax(that.rmax) {}
289
290 ~Writer() override = default;
291
292 std::optional<size_t> prepare(
293 Message m,
294 size_t size,
295 bool wait = true,
296 size_t* identifier = nullptr) override
297 {
298 // Make sure we aren't using a reserved message.
299 if ((m < Const::msg_min) || (m > Const::msg_max))
300 {
301 throw message_error(
302 m, fmt::format("Cannot use a reserved message ({})", m));
303 }
304
305 // Make sure the message fits.
306 if (size > Const::max_size())
307 {
308 throw message_error(
309 m,
310 fmt::format(
311 "Message ({}) is too long for any writer: {} > {}",
312 m,
313 size,
314 Const::max_size()));
315 }
316
317 auto rsize = Const::entry_size(size);
318 if (rsize > rmax)
319 {
320 throw message_error(
321 m,
322 fmt::format(
323 "Message ({}) is too long for this writer: {} > {}",
324 m,
325 rsize,
326 rmax));
327 }
328
329 auto r = reserve(rsize);
330
331 if (!r.has_value())
332 {
333 if (wait)
334 {
335 // Retry until there is sufficient space.
336 do
337 {
338 std::this_thread::yield();
339 r = reserve(rsize);
340 } while (!r.has_value());
341 }
342 else
343 {
344 // Fail if there is insufficient space.
345 return {};
346 }
347 }
348
349 // Write the preliminary header and return the buffer pointer.
350 // The initial header length has high bit set to indicate a pending
351 // message. We rewrite the real length after the message data.
352 write64(r.value().index, Const::make_header(m, size));
353
354 if (identifier != nullptr)
355 {
356 *identifier = r.value().identifier;
357 }
358
359 return {r.value().index + Const::header_size()};
360 }
361
362 void finish(const WriteMarker& marker) override
363 {
364 if (marker.has_value())
365 {
366 // Fix up the size to indicate we're done writing - unset pending bit.
367 const auto index = marker.value() - Const::header_size();
368 const auto header = read64(index);
369 const auto size = detail::length(header);
370 const auto m = detail::message(header);
371 const auto finished_header = Const::make_header(m, size, false);
372 write64(index, finished_header);
373 }
374 }
375
376 size_t get_max_message_size() override
377 {
378 return Const::max_size();
379 }
380
381 protected:
383 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
384 {
385 if (!marker.has_value())
386 {
387 return {};
388 }
389
390 const auto index = marker.value();
391
392 bd.check_access(index, size);
393
394 // Standard says memcpy(x, null, 0) is undefined, so avoid it
395 if (size > 0)
396 {
397 ccf::pal::safe_memcpy(bd.data + index, bytes, size);
398 }
399
400 return {index + size};
401 }
402
403 private:
404 // We use this to detect whether the head is ahead of the tail. In real
405 // operation they should be close to each, relative to the total range of a
406 // uint64_t. To handle wrap-around (ie - when a write has overflowed past
407 // the max value), we consider it larger if the distance between a and b is
408 // less than half the total range (and positive).
409 static bool greater_with_wraparound(size_t a, size_t b)
410 {
411 static constexpr auto switch_point = UINT64_MAX / 2;
412
413 return (a != b) && ((a - b) < switch_point);
414 }
415
416 virtual uint64_t read64(size_t index)
417 {
418 bd.check_access(index, sizeof(uint64_t));
419 return detail::read64_impl(bd, index);
420 }
421
422 virtual void write64(size_t index, uint64_t value)
423 {
424 bd.check_access(index, sizeof(value));
425 auto& ref = *(reinterpret_cast<uint64_t*>(bd.data + index));
426 std::atomic_ref<uint64_t> slot(ref);
427 slot.store(value, std::memory_order_release);
428 }
429
430 std::optional<Reservation> reserve(size_t size)
431 {
432 auto mask = bd.size - 1;
433 auto hd = bd.offsets->head_cache.load(std::memory_order_acquire);
434 auto tl = bd.offsets->tail.load(std::memory_order_relaxed);
435
436 // NB: These will be always be set on the first loop, before they are
437 // read, so this initialisation is unnecessary. It is added to placate
438 // static analyzers.
439 size_t padding = 0u;
440 size_t tl_index = 0u;
441
442 do
443 {
444 auto gap = tl - hd;
445 auto avail = bd.size - gap;
446
447 // If the head cache is too far behind the tail, or if the message does
448 // not fit in the available space, get an accurate head and try again.
449 if ((gap > bd.size) || (size > avail))
450 {
451 // If the message does not fit in the sum of front-space and
452 // back-space, see if head has moved to give us enough space.
453 hd = bd.offsets->head.load(std::memory_order_acquire);
454
455 // This happens if the head has passed the tail we previously loaded.
456 // It is safe to continue here, as the compare_exchange_weak is
457 // guaranteed to fail and update tl.
458 if (greater_with_wraparound(hd, tl))
459 {
460 continue;
461 }
462
463 avail = bd.size - (tl - hd);
464
465 // If it still doesn't fit, fail.
466 if (size > avail)
467 {
468 return {};
469 }
470
471 // This may move the head cache backwards, but if so, that is safe and
472 // will be corrected later.
473 bd.offsets->head_cache.store(hd, std::memory_order_release);
474 }
475
476 padding = 0;
477 tl_index = tl & mask;
478 auto block = bd.size - tl_index;
479
480 if (size > block)
481 {
482 // If the message doesn't fit in back-space...
483 auto hd_index = hd & mask;
484
485 if (size > hd_index)
486 {
487 // If message doesn't fit in front-space, see if the head has moved
488 hd = bd.offsets->head.load(std::memory_order_acquire);
489 hd_index = hd & mask;
490
491 // If it still doesn't fit, fail - there is not a contiguous region
492 // large enough for this reservation
493 if (size > hd_index)
494 {
495 return {};
496 }
497
498 // This may move the head cache backwards, but if so, that is safe
499 // and will be corrected later.
500 bd.offsets->head_cache.store(hd, std::memory_order_release);
501 }
502
503 // Pad the back-space and reserve front-space for our message in a
504 // single tail update.
505 padding = block;
506 }
507 } while (!bd.offsets->tail.compare_exchange_weak(
508 tl, tl + size + padding, std::memory_order_seq_cst));
509
510 if (padding != 0)
511 {
512 write64(
513 tl_index,
515 Const::msg_pad, padding - Const::header_size(), false));
516 tl_index = 0;
517 }
518
519 return {{tl_index, tl}};
520 }
521 };
522
523 // This is entirely non-virtual so can be safely passed to the enclave
525 {
526 private:
527 ringbuffer::Reader from_outside;
528 ringbuffer::Reader from_inside;
529
530 public:
532 const BufferDef& from_outside_buffer,
533 const BufferDef& from_inside_buffer) :
534 from_outside(from_outside_buffer),
535 from_inside(from_inside_buffer)
536 {}
537
539 {
540 return from_outside;
541 }
542
544 {
545 return from_inside;
546 }
547
549 {
550 return {from_inside};
551 }
552
554 {
555 return {from_outside};
556 }
557 };
558
560 {
561 ringbuffer::Circuit& raw_circuit;
562
563 public:
564 WriterFactory(ringbuffer::Circuit& c) : raw_circuit(c) {}
565
566 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
567 override
568 {
569 return std::make_shared<Writer>(raw_circuit.read_from_inside());
570 }
571
572 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
573 override
574 {
575 return std::make_shared<Writer>(raw_circuit.read_from_outside());
576 }
577 };
578
579 // This struct wraps buffer management to simplify testing
581 {
583 std::vector<uint8_t> storage;
585
586 TestBuffer(size_t size) : offsets(), storage(size, 0)
587 {
588 bd.data = storage.data();
589 bd.size = storage.size();
590 bd.offsets = &offsets;
591 }
592 };
593}
Definition ring_buffer_types.h:157
Definition ring_buffer_types.h:63
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:100
Definition ring_buffer.h:525
ringbuffer::Writer write_to_inside()
Definition ring_buffer.h:553
ringbuffer::Reader & read_from_inside()
Definition ring_buffer.h:543
Circuit(const BufferDef &from_outside_buffer, const BufferDef &from_inside_buffer)
Definition ring_buffer.h:531
ringbuffer::Reader & read_from_outside()
Definition ring_buffer.h:538
ringbuffer::Writer write_to_outside()
Definition ring_buffer.h:548
Definition ring_buffer.h:172
size_t read(size_t limit, Handler f)
Definition ring_buffer.h:205
virtual ~Reader()=default
Reader(const BufferDef &bd_)
Definition ring_buffer.h:189
Definition ring_buffer.h:560
WriterFactory(ringbuffer::Circuit &c)
Definition ring_buffer.h:564
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition ring_buffer.h:572
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition ring_buffer.h:566
Definition ring_buffer.h:267
std::optional< size_t > prepare(Message m, size_t size, bool wait=true, size_t *identifier=nullptr) override
Definition ring_buffer.h:292
~Writer() override=default
Writer(const Reader &r)
Definition ring_buffer.h:283
BufferDef bd
Definition ring_buffer.h:269
const size_t rmax
Definition ring_buffer.h:270
Writer(const Writer &that)
Definition ring_buffer.h:288
WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition ring_buffer.h:382
void finish(const WriteMarker &marker) override
Definition ring_buffer.h:362
size_t get_max_message_size() override
Definition ring_buffer.h:376
Definition ring_buffer_types.h:51
uint32_t length(uint64_t header)
Definition ring_buffer.h:165
uint64_t read64_impl(const BufferDef &bd, size_t index)
Definition ring_buffer.h:141
Message message(uint64_t header)
Definition ring_buffer.h:160
Definition non_blocking.h:15
std::function< void(Message, const uint8_t *, size_t)> Handler
Definition ring_buffer.h:26
uint32_t Message
Definition ring_buffer_types.h:19
Definition ring_buffer.h:116
void check_access(size_t index, size_t access_size)
Definition ring_buffer.h:122
uint8_t * data
Definition ring_buffer.h:117
Offsets * offsets
Definition ring_buffer.h:120
size_t size
Definition ring_buffer.h:118
Definition ring_buffer.h:33
@ msg_pad
Definition ring_buffer.h:39
@ msg_max
Definition ring_buffer.h:36
@ msg_none
Definition ring_buffer.h:38
@ msg_min
Definition ring_buffer.h:37
static constexpr size_t max_size()
Definition ring_buffer.h:69
static constexpr bool is_power_of_2(size_t n)
Definition ring_buffer.h:42
static constexpr size_t align_size(size_t n)
Definition ring_buffer.h:58
static constexpr size_t previous_power_of_2(size_t n)
Definition ring_buffer.h:86
static bool find_acceptable_sub_buffer(uint8_t *&data_, size_t &size_)
Definition ring_buffer.h:92
static bool is_aligned(uint8_t const *data, size_t align)
Definition ring_buffer.h:47
static constexpr size_t entry_size(size_t n)
Definition ring_buffer.h:64
static constexpr size_t max_reservation_size(size_t buffer_size)
Definition ring_buffer.h:76
static constexpr size_t header_size()
Definition ring_buffer.h:52
static uint64_t make_header(Message m, size_t size, bool pending=true)
Definition ring_buffer.h:108
Definition ring_buffer_types.h:26
std::atomic< size_t > head
Definition ring_buffer_types.h:31
std::atomic< size_t > head_cache
Definition ring_buffer_types.h:38
std::atomic< size_t > tail
Definition ring_buffer_types.h:46
Definition ring_buffer.h:581
std::vector< uint8_t > storage
Definition ring_buffer.h:583
BufferDef bd
Definition ring_buffer.h:584
Offsets offsets
Definition ring_buffer.h:582
TestBuffer(size_t size)
Definition ring_buffer.h:586
Definition ring_buffer.h:273
size_t index
Definition ring_buffer.h:275
size_t identifier
Definition ring_buffer.h:279