32 std::vector<uint8_t> buffer;
34 PendingMessage(
Message m_, std::vector<uint8_t>&& buffer_) :
42 std::deque<PendingMessage> pending;
51 size_t* identifier =
nullptr)
override
57 underlying_writer->prepare(m, total_size,
false, identifier);
59 if (marker.has_value())
67 pending.emplace_back(m, std::vector<uint8_t>(total_size));
69 auto& msg = pending.back();
70 msg.marker = (size_t)msg.buffer.data();
79 if (marker.has_value())
81 for (
auto& it : pending)
85 if ((
size_t)it.buffer.data() == marker.value())
95 underlying_writer->finish(marker);
99 const WriteMarker& marker,
const uint8_t* bytes,
size_t size)
override
101 if (marker.has_value())
103 for (
auto& it : pending)
105 const auto buffer_end = it.buffer.data() + it.buffer.size();
107 it.marker == marker.value() &&
108 marker.value() !=
reinterpret_cast<uint64_t
>(buffer_end))
112 auto dest = (uint8_t*)marker.value();
113 if (dest < it.buffer.data())
115 throw std::runtime_error(fmt::format(
116 "Invalid pending marker - writing before buffer: {} < {}",
118 (
size_t)it.buffer.data()));
121 if (dest + size > buffer_end)
123 throw std::runtime_error(fmt::format(
124 "Invalid pending marker - write extends beyond buffer: {} + {} "
128 (
size_t)buffer_end));
131 std::memcpy(dest, bytes, size);
133 it.marker = (size_t)dest;
141 return underlying_writer->write_bytes(marker, bytes, size);
146 return underlying_writer->get_max_message_size();
153 while (!pending.empty())
155 const auto& next = pending.front();
164 const auto marker = underlying_writer->prepare(
165 next.m, next.buffer.size(),
false,
nullptr);
167 if (!marker.has_value())
173 underlying_writer->write_bytes(
174 marker, next.buffer.data(), next.buffer.size());
175 underlying_writer->finish(marker);
181 return pending.empty();
190 using WriterSet = std::vector<std::weak_ptr<ringbuffer::NonBlockingWriter>>;
192 WriterSet writers_to_outside;
193 WriterSet writers_to_inside;
195 std::shared_ptr<ringbuffer::NonBlockingWriter> add_writer(
196 const std::shared_ptr<ringbuffer::AbstractWriter>& underlying,
199 auto new_writer = std::make_shared<NonBlockingWriter>(underlying);
200 writers.emplace_back(new_writer);
204 bool flush_all(WriterSet& writers)
206 bool all_empty =
true;
208 auto it = writers.begin();
209 while (it != writers.end())
211 auto shared_ptr = it->lock();
214 all_empty &= shared_ptr->try_flush_pending();
219 it = writers.erase(it);
230 std::shared_ptr<ringbuffer::NonBlockingWriter>
239 return flush_all(writers_to_outside);
242 std::shared_ptr<ringbuffer::NonBlockingWriter>
251 return flush_all(writers_to_inside);
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_inside()=0
virtual WriterPtr create_writer_to_outside()=0
Definition ring_buffer_types.h:61
std::optional< size_t > WriteMarker
Definition ring_buffer_types.h:98
Definition non_blocking.h:186
bool flush_all_outbound()
Definition non_blocking.h:237
bool flush_all_inbound()
Definition non_blocking.h:249
NonBlockingWriterFactory(AbstractWriterFactory &impl)
Definition non_blocking.h:227
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_inside() override
Definition non_blocking.h:260
std::shared_ptr< ringbuffer::AbstractWriter > create_writer_to_outside() override
Definition non_blocking.h:254
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_inside()
Definition non_blocking.h:243
std::shared_ptr< ringbuffer::NonBlockingWriter > create_non_blocking_writer_to_outside()
Definition non_blocking.h:231
Definition non_blocking.h:23
virtual void finish(const WriteMarker &marker) override
Definition non_blocking.h:77
NonBlockingWriter(const WriterPtr &writer)
Definition non_blocking.h:45
size_t get_max_message_size() override
Definition non_blocking.h:144
bool try_flush_pending()
Definition non_blocking.h:151
virtual WriteMarker prepare(ringbuffer::Message m, size_t total_size, bool, size_t *identifier=nullptr) override
Definition non_blocking.h:47
virtual WriteMarker write_bytes(const WriteMarker &marker, const uint8_t *bytes, size_t size) override
Definition non_blocking.h:98
Definition non_blocking.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
uint32_t Message
Definition ring_buffer_types.h:19