CCF
Loading...
Searching...
No Matches
snapshots.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/ds/nonstd.h"
9#include "ds/files.h"
10#include "time_bound_logger.h"
11
12#include <charconv>
13#include <filesystem>
14#include <fstream>
15#include <iostream>
16#include <optional>
17
18namespace fs = std::filesystem;
19
20namespace asynchost
21{
22 static constexpr auto snapshot_file_prefix = "snapshot";
23 static constexpr auto snapshot_idx_delimiter = "_";
24 static constexpr auto snapshot_committed_suffix = ".committed";
25
26 static bool is_snapshot_file(const std::string& file_name)
27 {
28 return file_name.starts_with(snapshot_file_prefix);
29 }
30
31 static bool is_snapshot_file_committed(const std::string& file_name)
32 {
33 return file_name.find(snapshot_committed_suffix) != std::string::npos;
34 }
35
36 static size_t read_idx(const std::string& str)
37 {
38 size_t idx = 0;
39 auto end_ptr = str.data() + str.size();
40
41 auto res = std::from_chars(str.data(), end_ptr, idx);
42 if (res.ec != std::errc())
43 {
44 throw std::logic_error(
45 fmt::format("Could not read idx from string \"{}\": {}", str, res.ec));
46 }
47 else if (res.ptr != end_ptr)
48 {
49 throw std::logic_error(fmt::format(
50 "Trailing characters in \"{}\" cannot be converted to idx: \"{}\"",
51 str,
52 std::string(res.ptr, end_ptr)));
53 }
54 return idx;
55 }
56
57 static std::optional<size_t> get_evidence_commit_idx_from_file_name(
58 const std::string& file_name)
59 {
60 // Only returns an evidence commit index for 1.x committed snapshots.
61 // 1.x committed snapshots file names are of the form:
62 // "snapshot_X_Y.committed_Z" while 2.x+ ones are of the form:
63 // "snapshot_X_Y.committed"
64 auto pos = file_name.find(snapshot_committed_suffix);
65 if (pos == std::string::npos)
66 {
67 throw std::logic_error(
68 fmt::format("Snapshot file \"{}\" is not committed", file_name));
69 }
70
71 pos = file_name.find(snapshot_idx_delimiter, pos);
72 if (pos == std::string::npos)
73 {
74 // 2.x+ snapshot
75 return std::nullopt;
76 }
77
78 return read_idx(file_name.substr(pos + 1));
79 }
80
81 static size_t get_snapshot_idx_from_file_name(const std::string& file_name)
82 {
83 if (!is_snapshot_file(file_name))
84 {
85 throw std::logic_error(
86 fmt::format("File \"{}\" is not a valid snapshot file", file_name));
87 }
88
89 auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter);
90 if (idx_pos == std::string::npos)
91 {
92 throw std::logic_error(fmt::format(
93 "Snapshot file name {} does not contain snapshot seqno", file_name));
94 }
95
96 auto evidence_idx_pos =
97 file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1);
98 if (evidence_idx_pos == std::string::npos)
99 {
100 throw std::logic_error(fmt::format(
101 "Snapshot file \"{}\" does not contain evidence index", file_name));
102 }
103
104 return read_idx(
105 file_name.substr(idx_pos + 1, evidence_idx_pos - idx_pos - 1));
106 }
107
108 static size_t get_snapshot_evidence_idx_from_file_name(
109 const std::string& file_name)
110 {
111 if (!is_snapshot_file(file_name))
112 {
113 throw std::logic_error(
114 fmt::format("File \"{}\" is not a valid snapshot file", file_name));
115 }
116
117 auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter);
118 if (idx_pos == std::string::npos)
119 {
120 throw std::logic_error(
121 fmt::format("Snapshot file \"{}\" does not contain index", file_name));
122 }
123
124 auto evidence_idx_pos =
125 file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1);
126 if (evidence_idx_pos == std::string::npos)
127 {
128 throw std::logic_error(fmt::format(
129 "Snapshot file \"{}\" does not contain evidence index", file_name));
130 }
131
132 // Note: Snapshot file may not be committed
133 size_t end_str = std::string::npos;
134 auto commit_suffix_pos =
135 file_name.find_first_of(snapshot_committed_suffix, evidence_idx_pos + 1);
136 if (commit_suffix_pos != std::string::npos)
137 {
138 end_str = commit_suffix_pos - evidence_idx_pos - 1;
139 }
140
141 return read_idx(file_name.substr(evidence_idx_pos + 1, end_str));
142 }
143
145 const fs::path& directory, size_t& latest_committed_snapshot_idx)
146 {
147 std::optional<fs::path> latest_committed_snapshot_file_name = std::nullopt;
148
149 for (auto& f : fs::directory_iterator(directory))
150 {
151 auto file_name = f.path().filename();
152 if (!is_snapshot_file(file_name))
153 {
154 LOG_INFO_FMT("Ignoring non-snapshot file {}", file_name);
155 continue;
156 }
157
158 if (!is_snapshot_file_committed(file_name))
159 {
160 LOG_INFO_FMT("Ignoring non-committed snapshot file {}", file_name);
161 continue;
162 }
163
164 if (fs::exists(f.path()) && fs::is_empty(f.path()))
165 {
166 LOG_INFO_FMT("Ignoring empty snapshot file {}", file_name);
167 continue;
168 }
169
170 auto snapshot_idx = get_snapshot_idx_from_file_name(file_name);
171 if (snapshot_idx > latest_committed_snapshot_idx)
172 {
173 latest_committed_snapshot_file_name = file_name;
174 latest_committed_snapshot_idx = snapshot_idx;
175 }
176 }
177
178 return latest_committed_snapshot_file_name;
179 }
180
182 {
183 private:
184 ringbuffer::WriterPtr to_enclave;
185
186 const fs::path snapshot_dir;
187 const std::optional<fs::path> read_snapshot_dir = std::nullopt;
188
189 struct PendingSnapshot
190 {
191 ::consensus::Index evidence_idx;
192 std::shared_ptr<std::vector<uint8_t>> snapshot;
193 };
194 std::map<size_t, PendingSnapshot> pending_snapshots;
195
196 public:
198 const std::string& snapshot_dir_,
199 ringbuffer::AbstractWriterFactory& writer_factory,
200 const std::optional<std::string>& read_snapshot_dir_ = std::nullopt) :
201 to_enclave(writer_factory.create_writer_to_inside()),
202 snapshot_dir(snapshot_dir_),
203 read_snapshot_dir(read_snapshot_dir_)
204 {
205 if (fs::is_directory(snapshot_dir))
206 {
208 "Snapshots will be stored in existing directory: {}", snapshot_dir);
209 }
210 else if (!fs::create_directory(snapshot_dir))
211 {
212 throw std::logic_error(
213 fmt::format("Could not create snapshot directory: {}", snapshot_dir));
214 }
215
216 if (
217 read_snapshot_dir.has_value() &&
218 !fs::is_directory(read_snapshot_dir.value()))
219 {
220 throw std::logic_error(fmt::format(
221 "{} read-only snapshot is not a directory",
222 read_snapshot_dir.value()));
223 }
224 }
225
226 fs::path get_main_directory() const
227 {
228 return snapshot_dir;
229 }
230
231 std::shared_ptr<std::vector<uint8_t>> add_pending_snapshot(
233 ::consensus::Index evidence_idx,
234 size_t requested_size)
235 {
236 auto snapshot = std::make_shared<std::vector<uint8_t>>(requested_size);
237 pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot});
238
240 "Added pending snapshot {} [{} bytes]", idx, requested_size);
241
242 return snapshot;
243 }
244
245#define THROW_ON_ERROR(x, name) \
246 do \
247 { \
248 auto rc = x; \
249 if (rc == -1) \
250 { \
251 throw std::runtime_error(fmt::format( \
252 "Error ({}) writing snapshot {} in " #x, strerror(errno), name)); \
253 } \
254 } while (0)
255
257 {
258 // Inputs, populated at construction
259 const std::filesystem::path dir;
260 const std::string tmp_file_name;
261 const int snapshot_fd;
262
263 // Outputs, populated by callback
264 std::string committed_file_name = {};
265 };
266
267 static void on_snapshot_sync_and_rename(uv_work_t* req)
268 {
269// don't init / deinit in sync
270#ifndef TEST_MODE_EXECUTE_SYNC_INLINE
272#endif
273 auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
274
275 {
276 asynchost::TimeBoundLogger log_if_slow(
277 fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
278 fsync(data->snapshot_fd);
279 }
280
281 close(data->snapshot_fd);
282
283 // e.g. snapshot_100_105.committed
284 data->committed_file_name =
285 fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
286 const auto full_committed_path = data->dir / data->committed_file_name;
287
288 const auto full_tmp_path = data->dir / data->tmp_file_name;
289 files::rename(full_tmp_path, full_committed_path);
290
291 // read and log the hash of the written snapshot
292 auto raw = files::slurp(full_committed_path);
294 "Written snapshot to {} (size: {} bytes, sha256: {} )",
295 data->committed_file_name,
296 raw.size(),
298
299#ifndef TEST_MODE_EXECUTE_SYNC_INLINE
301#endif
302 }
303
304 static void on_snapshot_sync_and_rename_complete(uv_work_t* req, int status)
305 {
306 auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
307
309 "Renamed temporary snapshot {} to {}",
310 data->tmp_file_name,
311 data->committed_file_name);
312
313 delete data;
314 delete req;
315 }
316
318 ::consensus::Index snapshot_idx,
319 const uint8_t* receipt_data,
320 size_t receipt_size)
321 {
322 TimeBoundLogger log_if_slow(
323 fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx));
324
325 try
326 {
327 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();
328 it++)
329 {
330 if (snapshot_idx == it->first)
331 {
332 // e.g. snapshot_100_105
333 auto file_name = fmt::format(
334 "{}{}{}{}{}",
335 snapshot_file_prefix,
336 snapshot_idx_delimiter,
337 it->first,
338 snapshot_idx_delimiter,
339 it->second.evidence_idx);
340 auto full_snapshot_path = snapshot_dir / file_name;
341
342 int snapshot_fd = open(
343 full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
344 if (snapshot_fd == -1)
345 {
346 if (errno == EEXIST)
347 {
348 // In the case that a file with this name already exists, keep
349 // existing file and drop pending snapshot
351 "Cannot write snapshot as file already exists: {}",
352 file_name);
353 }
354 else
355 {
357 "Cannot write snapshot: error ({}) opening file {}",
358 errno,
359 file_name);
360 }
361 }
362 else
363 {
364 const auto& snapshot = it->second.snapshot;
365
367 write(snapshot_fd, snapshot->data(), snapshot->size()),
368 file_name);
370 write(snapshot_fd, receipt_data, receipt_size), file_name);
371
373 "New snapshot file written to {} [{} bytes] (unsynced)",
374 file_name,
375 snapshot->size() + receipt_size);
376
377 // Call fsync and rename on a worker-thread via uv async, as they
378 // may be slow
379 uv_work_t* work_handle = new uv_work_t;
380
381 {
382 auto* data = new AsyncSnapshotSyncAndRename{
383 .dir = snapshot_dir,
384 .tmp_file_name = file_name,
385 .snapshot_fd = snapshot_fd};
386
387 work_handle->data = data;
388 }
389
390#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
391 on_snapshot_sync_and_rename(work_handle);
393#else
394 uv_queue_work(
395 uv_default_loop(),
396 work_handle,
399#endif
400 }
401
402 auto sha = ccf::crypto::Sha256Hash(*it->second.snapshot);
404 "Writing snapshot to {} (sha256: {})", full_snapshot_path, sha);
405
406 pending_snapshots.erase(it);
407
408 return;
409 }
410 }
411
412 LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx);
413 }
414 catch (std::exception& e)
415 {
417 "Exception while attempting to commit snapshot at {}: {}",
418 snapshot_idx,
419 e.what());
420 }
421 }
422#undef THROW_ON_ERROR
423
424 std::optional<std::pair<fs::path, fs::path>>
426 {
427 // Keep track of latest snapshot file in both directories
428 size_t latest_idx = 0;
429
430 std::optional<fs::path> read_only_latest_committed_snapshot =
431 std::nullopt;
432 if (read_snapshot_dir.has_value())
433 {
434 read_only_latest_committed_snapshot =
436 read_snapshot_dir.value(), latest_idx);
437 }
438
439 auto main_latest_committed_snapshot =
440 find_latest_committed_snapshot_in_directory(snapshot_dir, latest_idx);
441
442 if (main_latest_committed_snapshot.has_value())
443 {
444 return std::make_pair(
445 snapshot_dir, main_latest_committed_snapshot.value());
446 }
447 else if (read_only_latest_committed_snapshot.has_value())
448 {
449 return std::make_pair(
450 read_snapshot_dir.value(),
451 read_only_latest_committed_snapshot.value());
452 }
453
454 return std::nullopt;
455 }
456
459 {
461 disp,
462 ::consensus::snapshot_allocate,
463 [this](const uint8_t* data, size_t size) {
464 auto idx = serialized::read<::consensus::Index>(data, size);
465 auto evidence_idx = serialized::read<::consensus::Index>(data, size);
466 auto requested_size = serialized::read<size_t>(data, size);
467 auto generation_count = serialized::read<uint32_t>(data, size);
468
469 auto snapshot =
470 add_pending_snapshot(idx, evidence_idx, requested_size);
471
473 ::consensus::snapshot_allocated,
474 to_enclave,
475 std::span<uint8_t>{snapshot->data(), snapshot->size()},
476 generation_count);
477 });
478
480 disp,
481 ::consensus::snapshot_commit,
482 [this](const uint8_t* data, size_t size) {
483 auto snapshot_idx = serialized::read<::consensus::Index>(data, size);
484 commit_snapshot(snapshot_idx, data, size);
485 });
486 }
487 };
488}
Definition snapshots.h:182
static void on_snapshot_sync_and_rename_complete(uv_work_t *req, int status)
Definition snapshots.h:304
std::optional< std::pair< fs::path, fs::path > > find_latest_committed_snapshot()
Definition snapshots.h:425
SnapshotManager(const std::string &snapshot_dir_, ringbuffer::AbstractWriterFactory &writer_factory, const std::optional< std::string > &read_snapshot_dir_=std::nullopt)
Definition snapshots.h:197
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition snapshots.h:457
fs::path get_main_directory() const
Definition snapshots.h:226
static void on_snapshot_sync_and_rename(uv_work_t *req)
Definition snapshots.h:267
void commit_snapshot(::consensus::Index snapshot_idx, const uint8_t *receipt_data, size_t receipt_size)
Definition snapshots.h:317
std::shared_ptr< std::vector< uint8_t > > add_pending_snapshot(::consensus::Index idx, ::consensus::Index evidence_idx, size_t requested_size)
Definition snapshots.h:231
Definition sha256_hash.h:16
std::string hex_str() const
Definition sha256_hash.cpp:61
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:395
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
Definition after_io.h:8
std::optional< fs::path > find_latest_committed_snapshot_in_directory(const fs::path &directory, size_t &latest_committed_snapshot_idx)
Definition snapshots.h:144
void openssl_sha256_init()
Definition hash.cpp:43
void openssl_sha256_shutdown()
Definition hash.cpp:69
uint64_t Index
Definition ledger_enclave_types.h:11
std::vector< uint8_t > slurp(const std::string &file, bool optional=false)
Tries to read a file as byte vector.
Definition files.h:43
void rename(const fs::path &src, const fs::path &dst)
Definition files.h:141
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
#define THROW_ON_ERROR(x, name)
Definition snapshots.h:245
std::string committed_file_name
Definition snapshots.h:264
const std::filesystem::path dir
Definition snapshots.h:259
const int snapshot_fd
Definition snapshots.h:261
const std::string tmp_file_name
Definition snapshots.h:260
Definition time_bound_logger.h:14