CCF
Loading...
Searching...
No Matches
non_blocking.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ring_buffer.h"
6
7#include <deque>
8#define FMT_HEADER_ONLY
9#include <fmt/format.h>
10#include <memory>
11#include <vector>
12
13namespace ringbuffer
14{
15 // This wraps an underlying Writer implementation and ensure calls to write()
16 // will not block indefinitely. This never calls the blocking write()
17 // implementation. Instead it calls try_write(), and in the case that a write
18 // fails (because the target ringbuffer is full), the message is placed in a
19 // pending queue. These pending message must be flushed regularly, attempting
20 // again to write to the ringbuffer.
21
23 {
24 private:
25 WriterPtr underlying_writer;
26
27 struct PendingMessage
28 {
29 Message m;
30 size_t marker;
31 bool finished;
32 std::vector<uint8_t> buffer;
33
34 PendingMessage(Message m_, std::vector<uint8_t>&& buffer_) :
35 m(m_),
36 marker(0),
37 finished(false),
38 buffer(buffer_)
39 {}
40 };
41
42 std::deque<PendingMessage> pending;
43
44 public:
45 NonBlockingWriter(const WriterPtr& writer) : underlying_writer(writer) {}
46
49 size_t total_size,
50 bool,
51 size_t* identifier = nullptr) override
52 {
53 if (pending.empty())
54 {
55 // No currently pending messages - try to write to underlying buffer
56 const auto marker =
57 underlying_writer->prepare(m, total_size, false, identifier);
58
59 if (marker.has_value())
60 {
61 return marker;
62 }
63
64 // Prepare failed, no space in buffer - so add to queue
65 }
66
67 pending.emplace_back(m, std::vector<uint8_t>(total_size));
68
69 auto& msg = pending.back();
70 msg.marker = (size_t)msg.buffer.data();
71
72 // NB: There is an assumption that these markers will never conflict with
73 // the markers produced by the underlying writer impl
74 return msg.marker;
75 }
76
77 virtual void finish(const WriteMarker& marker) override
78 {
79 if (marker.has_value())
80 {
81 for (auto& it : pending)
82 {
83 // NB: finish is passed the _initial_ WriteMarker, so we compare
84 // against it.buffer.data() rather than it.marker
85 if ((size_t)it.buffer.data() == marker.value())
86 {
87 // This is a pending write. Mark as completed, so we can later flush
88 // it
89 it.finished = true;
90 return;
91 }
92 }
93 }
94
95 underlying_writer->finish(marker);
96 }
97
99 const WriteMarker& marker, const uint8_t* bytes, size_t size) override
100 {
101 if (marker.has_value())
102 {
103 for (auto& it : pending)
104 {
105 const auto buffer_end = it.buffer.data() + it.buffer.size();
106 if (
107 it.marker == marker.value() &&
108 marker.value() != reinterpret_cast<uint64_t>(buffer_end))
109 {
110 // This is a pending write - dump data directly to write marker,
111 // which should be within the appropriate buffer
112 auto dest = (uint8_t*)marker.value();
113 if (dest < it.buffer.data())
114 {
115 throw std::runtime_error(fmt::format(
116 "Invalid pending marker - writing before buffer: {} < {}",
117 (size_t)dest,
118 (size_t)it.buffer.data()));
119 }
120
121 if (dest + size > buffer_end)
122 {
123 throw std::runtime_error(fmt::format(
124 "Invalid pending marker - write extends beyond buffer: {} + {} "
125 "> {}",
126 (size_t)dest,
127 (size_t)size,
128 (size_t)buffer_end));
129 }
130
131 std::memcpy(dest, bytes, size);
132 dest += size;
133 it.marker = (size_t)dest;
134 return {it.marker};
135 }
136 }
137 }
138
139 // Otherwise, this was successfully prepared on the underlying
140 // implementation - delegate to it for remaining writes
141 return underlying_writer->write_bytes(marker, bytes, size);
142 }
143
144 size_t get_max_message_size() override
145 {
146 return underlying_writer->get_max_message_size();
147 }
148
149 // Returns true if flush completed and there are no more pending messages.
150 // False means 0 or more pending messages were written, but some remain
152 {
153 while (!pending.empty())
154 {
155 const auto& next = pending.front();
156 if (!next.finished)
157 {
158 // If we reached an in-progress pending message, stop - we can't flush
159 // this or anything after it
160 break;
161 }
162
163 // Try to write this pending message to the underlying writer
164 const auto marker = underlying_writer->prepare(
165 next.m, next.buffer.size(), false, nullptr);
166
167 if (!marker.has_value())
168 {
169 // No space - stop flushing
170 break;
171 }
172
173 underlying_writer->write_bytes(
174 marker, next.buffer.data(), next.buffer.size());
175 underlying_writer->finish(marker);
176
177 // This pending message was successfully written - pop it and continue
178 pending.pop_front();
179 }
180
181 return pending.empty();
182 }
183 };
184
186 {
187 AbstractWriterFactory& factory_impl;
188
189 // Could be set, but needs custom hash() + operator<, so vector is simpler
190 using WriterSet = std::vector<std::weak_ptr<ringbuffer::NonBlockingWriter>>;
191
192 WriterSet writers_to_outside;
193 WriterSet writers_to_inside;
194
195 std::shared_ptr<ringbuffer::NonBlockingWriter> add_writer(
196 const std::shared_ptr<ringbuffer::AbstractWriter>& underlying,
197 WriterSet& writers)
198 {
199 auto new_writer = std::make_shared<NonBlockingWriter>(underlying);
200 writers.emplace_back(new_writer);
201 return new_writer;
202 }
203
204 bool flush_all(WriterSet& writers)
205 {
206 bool all_empty = true;
207
208 auto it = writers.begin();
209 while (it != writers.end())
210 {
211 auto shared_ptr = it->lock();
212 if (shared_ptr)
213 {
214 all_empty &= shared_ptr->try_flush_pending();
215 ++it;
216 }
217 else
218 {
219 it = writers.erase(it);
220 }
221 }
222
223 return all_empty;
224 }
225
226 public:
228 {}
229
230 std::shared_ptr<ringbuffer::NonBlockingWriter>
232 {
233 return add_writer(
234 factory_impl.create_writer_to_outside(), writers_to_outside);
235 }
236
238 {
239 return flush_all(writers_to_outside);
240 }
241
242 std::shared_ptr<ringbuffer::NonBlockingWriter>
244 {
245 return add_writer(
246 factory_impl.create_writer_to_inside(), writers_to_inside);
247 }
248
250 {
251 return flush_all(writers_to_inside);
252 }
253
254 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
255 override
256 {
258 }
259
260 std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
261 override
262 {
264 }
265 };
266}
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_inside()=0
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition non_blocking.h:186
bool flush_all_outbound()
Definition non_blocking.h:237
bool flush_all_inbound()
Definition non_blocking.h:249
NonBlockingWriterFactory(AbstractWriterFactory &impl)
Definition non_blocking.h:227
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition non_blocking.h:260
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition non_blocking.h:254
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_inside()
Definition non_blocking.h:243
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_outside()
Definition non_blocking.h:231
Definition non_blocking.h:23
virtual void finish(const WriteMarker &marker) override
Definition non_blocking.h:77
NonBlockingWriter(const WriterPtr &writer)
Definition non_blocking.h:45
size_t get_max_message_size() override
Definition non_blocking.h:144
bool try_flush_pending()
Definition non_blocking.h:151
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool, size_t *identifier=nullptr) override
Definition non_blocking.h:47
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition non_blocking.h:98
Definition non_blocking.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19