CCF
Loading...
Searching...
No Matches
snapshot_manager.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
9
10#include <charconv>
11#include <filesystem>
12#include <fstream>
13#include <iostream>
14#include <optional>
15
16namespace fs = std::filesystem;
17
18namespace snapshots
19{
21 {
22 private:
23 ringbuffer::WriterPtr to_enclave;
24
25 const fs::path snapshot_dir;
26 const std::optional<fs::path> read_snapshot_dir = std::nullopt;
27 struct PendingSnapshot
28 {
29 ::consensus::Index evidence_idx;
30 std::shared_ptr<std::vector<uint8_t>> snapshot;
31 };
32 std::map<size_t, PendingSnapshot> pending_snapshots;
33
34 public:
36 const std::string& snapshot_dir_,
38 const std::optional<std::string>& read_snapshot_dir_ = std::nullopt) :
39 to_enclave(writer_factory.create_writer_to_inside()),
40 snapshot_dir(snapshot_dir_),
41 read_snapshot_dir(read_snapshot_dir_)
42 {
43 if (fs::is_directory(snapshot_dir))
44 {
46 "Snapshots will be stored in existing directory: {}", snapshot_dir);
47 }
48 else
49 {
50 asynchost::TimeBoundLogger log_if_slow(fmt::format(
51 "Creating snapshot directory - create_directory({})", snapshot_dir));
52 if (!fs::create_directory(snapshot_dir))
53 {
54 throw std::logic_error(fmt::format(
55 "Could not create snapshot directory: {}", snapshot_dir));
56 }
57 }
58
59 if (
60 read_snapshot_dir.has_value() &&
61 !fs::is_directory(read_snapshot_dir.value()))
62 {
63 throw std::logic_error(fmt::format(
64 "{} read-only snapshot is not a directory",
65 read_snapshot_dir.value()));
66 }
67 }
68
71
72 [[nodiscard]] fs::path get_main_directory() const
73 {
74 return snapshot_dir;
75 }
76
77 std::shared_ptr<std::vector<uint8_t>> add_pending_snapshot(
79 ::consensus::Index evidence_idx,
80 size_t requested_size)
81 {
82 auto snapshot = std::make_shared<std::vector<uint8_t>>(requested_size);
83 pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot});
84
86 "Added pending snapshot {} [{} bytes]", idx, requested_size);
87
88 return snapshot;
89 }
90
91#define THROW_ON_ERROR(x, name) \
92 do \
93 { \
94 auto rc = x; \
95 if (rc == -1) \
96 { \
97 throw std::runtime_error( \
98 fmt::format(/* NOLINTNEXTLINE(concurrency-mt-unsafe) */ \
99 "Error ({}) writing snapshot {} in " #x, \
100 strerror(errno), \
101 name)); \
102 } \
103 } while (0)
104
106 {
107 // Inputs, populated at construction
108 const std::filesystem::path dir;
109 const std::string tmp_file_name;
110 const int snapshot_fd;
111
112 // Outputs, populated by callback
114 };
115
116 static void on_snapshot_sync_and_rename(uv_work_t* req)
117 {
118 auto* data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
119
120 {
121 asynchost::TimeBoundLogger log_if_slow(
122 fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
123 fsync(data->snapshot_fd); // NOLINT(concurrency-mt-unsafe)
124 }
125
126 {
127 asynchost::TimeBoundLogger log_if_slow(fmt::format(
128 "Closing snapshot file - close({})", data->tmp_file_name));
129 close(data->snapshot_fd); // NOLINT(concurrency-mt-unsafe)
130 }
131
132 // e.g. snapshot_100_105.committed
133 data->committed_file_name =
134 fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
135 const auto full_committed_path = data->dir / data->committed_file_name;
136
137 const auto full_tmp_path = data->dir / data->tmp_file_name;
138 {
139 asynchost::TimeBoundLogger log_if_slow(fmt::format(
140 "Renaming snapshot to committed - rename({})", data->tmp_file_name));
141 files::rename(full_tmp_path, full_committed_path);
142 }
143 }
144
146 uv_work_t* req, int /*status*/)
147 {
148 auto* data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
149
151 "Renamed temporary snapshot {} to {}",
152 data->tmp_file_name,
153 data->committed_file_name);
154
155 delete data; // NOLINT(cppcoreguidelines-owning-memory)
156 delete req; // NOLINT(cppcoreguidelines-owning-memory)
157 }
158
160 ::consensus::Index snapshot_idx,
161 const uint8_t* receipt_data,
162 size_t receipt_size)
163 {
164 asynchost::TimeBoundLogger log_if_slow(
165 fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx));
166
167 try
168 {
169 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();
170 it++)
171 {
172 if (snapshot_idx == it->first)
173 {
174 // e.g. snapshot_100_105
175 auto file_name = fmt::format(
176 "{}{}{}{}{}",
177 snapshot_file_prefix,
178 snapshot_idx_delimiter,
179 it->first,
180 snapshot_idx_delimiter,
181 it->second.evidence_idx);
182 auto full_snapshot_path = snapshot_dir / file_name;
183
184 int snapshot_fd = -1;
185 {
186 asynchost::TimeBoundLogger log_open_if_slow(
187 fmt::format("Opening snapshot file - open({})", file_name));
188 snapshot_fd = open(
189 full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
190 }
191 if (snapshot_fd == -1)
192 {
193 if (errno == EEXIST)
194 {
195 // In the case that a file with this name already exists, keep
196 // existing file and drop pending snapshot
198 "Cannot write snapshot as file already exists: {}",
199 file_name);
200 }
201 else
202 {
204 "Cannot write snapshot: error ({}) opening file {}",
205 errno,
206 file_name);
207 }
208 }
209 else
210 {
211 const auto& snapshot = it->second.snapshot;
212
213 {
214 asynchost::TimeBoundLogger log_write_if_slow(fmt::format(
215 "Writing snapshot data ({} bytes) - write({})",
216 snapshot->size(),
217 file_name));
218 // NOLINTNEXTLINE(concurrency-mt-unsafe)
220 write(snapshot_fd, snapshot->data(), snapshot->size()),
221 file_name);
222 }
223 {
224 asynchost::TimeBoundLogger log_write_if_slow(fmt::format(
225 "Writing snapshot receipt ({} bytes) - write({})",
226 receipt_size,
227 file_name));
228 // NOLINTNEXTLINE(concurrency-mt-unsafe)
230 write(snapshot_fd, receipt_data, receipt_size), file_name);
231 }
232
234 "New snapshot file written to {} [{} bytes] (unsynced)",
235 file_name,
236 snapshot->size() + receipt_size);
237
238 // Call fsync and rename on a worker-thread via uv async, as they
239 // may be slow
240 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
241 auto* work_handle = new uv_work_t;
242
243 {
244 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
245 auto* data = new AsyncSnapshotSyncAndRename{
246 .dir = snapshot_dir,
247 .tmp_file_name = file_name,
248 .snapshot_fd = snapshot_fd,
249 .committed_file_name = {}};
250
251 work_handle->data = data;
252 }
253
254#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
255 on_snapshot_sync_and_rename(work_handle);
257#else
258 uv_queue_work(
259 uv_default_loop(),
260 work_handle,
263#endif
264 }
265
266 pending_snapshots.erase(it);
267
268 return;
269 }
270 }
271
272 LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx);
273 }
274 catch (std::exception& e)
275 {
277 "Exception while attempting to commit snapshot at {}: {}",
278 snapshot_idx,
279 e.what());
280 }
281 }
282#undef THROW_ON_ERROR
283
284 std::optional<fs::path> find_latest_committed_snapshot()
285 {
286 std::vector<fs::path> directories;
287 directories.push_back(snapshot_dir);
288 if (read_snapshot_dir.has_value())
289 {
290 directories.push_back(read_snapshot_dir.value());
291 }
293 }
294
297 {
299 disp,
300 ::consensus::snapshot_allocate,
301 [this](const uint8_t* data, size_t size) {
302 auto idx = serialized::read<::consensus::Index>(data, size);
303 auto evidence_idx = serialized::read<::consensus::Index>(data, size);
304 auto requested_size = serialized::read<size_t>(data, size);
305 auto generation_count = serialized::read<uint32_t>(data, size);
306
307 auto snapshot =
308 add_pending_snapshot(idx, evidence_idx, requested_size);
309
311 ::consensus::snapshot_allocated,
312 to_enclave,
313 std::span<uint8_t>{snapshot->data(), snapshot->size()},
314 generation_count);
315 });
316
318 disp,
319 ::consensus::snapshot_commit,
320 [this](const uint8_t* data, size_t size) {
321 auto snapshot_idx = serialized::read<::consensus::Index>(data, size);
322 commit_snapshot(snapshot_idx, data, size);
323 });
324 }
325 };
326}
Definition messaging.h:38
Definition ring_buffer_types.h:157
Definition snapshot_manager.h:21
fs::path get_main_directory() const
Definition snapshot_manager.h:72
SnapshotManager(const std::string &snapshot_dir_, ringbuffer::AbstractWriterFactory &writer_factory, const std::optional< std::string > &read_snapshot_dir_=std::nullopt)
Definition snapshot_manager.h:35
void commit_snapshot(::consensus::Index snapshot_idx, const uint8_t *receipt_data, size_t receipt_size)
Definition snapshot_manager.h:159
SnapshotManager & operator=(const SnapshotManager &)=delete
static void on_snapshot_sync_and_rename_complete(uv_work_t *req, int)
Definition snapshot_manager.h:145
SnapshotManager(const SnapshotManager &)=delete
static void on_snapshot_sync_and_rename(uv_work_t *req)
Definition snapshot_manager.h:116
std::optional< fs::path > find_latest_committed_snapshot()
Definition snapshot_manager.h:284
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition snapshot_manager.h:295
std::shared_ptr< std::vector< uint8_t > > add_pending_snapshot(::consensus::Index idx, ::consensus::Index evidence_idx, size_t requested_size)
Definition snapshot_manager.h:77
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
uint64_t Index
Definition ledger_enclave_types.h:11
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
Definition fetch.h:63
std::optional< fs::path > find_latest_committed_snapshot_in_directories(const std::vector< fs::path > &directories, std::optional< size_t > minimum_idx=std::nullopt)
Definition filenames.h:224
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
#define THROW_ON_ERROR(x, name)
Definition snapshot_manager.h:91
Definition time_bound_logger.h:14
const std::filesystem::path dir
Definition snapshot_manager.h:108
const int snapshot_fd
Definition snapshot_manager.h:110
const std::string tmp_file_name
Definition snapshot_manager.h:109
std::string committed_file_name
Definition snapshot_manager.h:113