CCF
Loading...
Searching...
No Matches
ledger.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ds/nonstd.h"
6#include "ccf/pal/locking.h"
8#include "ds/files.h"
10#include "ds/messaging.h"
11#include "ds/serialized.h"
12#include "kv/kv_types.h"
14#include "ledger_filenames.h"
15#include "time_bound_logger.h"
16
17#include <cstdint>
18#include <cstdio>
19#include <filesystem>
20#include <list>
21#include <map>
22#include <string>
23#include <sys/types.h>
24#include <unistd.h>
25#include <uv.h>
26#include <vector>
27
28namespace fs = std::filesystem;
29
30namespace asynchost
31{
32 static constexpr size_t ledger_max_read_cache_files_default = 5;
33
34 static std::optional<std::string> get_file_name_with_idx(
35 const std::string& dir, size_t idx, bool allow_recovery_files)
36 {
37 std::optional<std::string> match = std::nullopt;
38 for (auto const& f : fs::directory_iterator(dir))
39 {
40 // If any file, based on its name, contains idx. Only committed
41 // (i.e. those with a last idx) are considered here.
42 auto f_name = f.path().filename();
43 if (
44 is_ledger_file_name_ignored(f_name) ||
45 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
46 {
47 continue;
48 }
49
50 size_t start_idx = 0;
51 std::optional<size_t> last_idx = std::nullopt;
52 try
53 {
54 start_idx = get_start_idx_from_file_name(f_name);
55 last_idx = get_last_idx_from_file_name(f_name);
56 }
57 catch (const std::exception& e)
58 {
59 // Ignoring invalid ledger file
60 continue;
61 }
62 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
63 {
64 match = f_name;
65 break;
66 }
67 }
68
69 return match;
70 }
71
73 {
74 std::vector<uint8_t> data;
75 size_t end_idx{};
76 };
77
79 {
80 private:
81 using positions_offset_header_t = size_t;
82 static constexpr auto file_name_prefix = "ledger";
83
84 const fs::path dir;
85 fs::path file_name;
86
87 // This uses C stdio instead of fstream because an fstream
88 // cannot be truncated.
89 FILE* file = nullptr;
90 ccf::pal::Mutex file_lock;
91
92 size_t start_idx = 1;
93 size_t total_len = 0; // Points to end of last written entry
94 std::vector<uint32_t> positions;
95
96 bool completed = false;
97 bool committed = false;
98
99 bool recovery = false;
100
101 // This flag is set when an existing ledger is recovered and started (init)
102 // from an old idx. In this case, further ledger files (i.e. those which
103 // contain entries later than init idx), remain on disk and new entries are
104 // checked against the existing ones, until a divergence is found.
105 bool from_existing_file = false;
106
107 public:
108 // Used when creating a new (empty) ledger file
109 LedgerFile(const fs::path& dir, size_t start_idx, bool recovery = false) :
110 dir(dir),
111 file_name(fmt::format("{}_{}", file_name_prefix, start_idx)),
112 start_idx(start_idx),
113 recovery(recovery)
114 {
115 if (recovery)
116 {
117 file_name =
118 fmt::format("{}{}", file_name.string(), ledger_recovery_file_suffix);
119 }
120
121 auto file_path = dir / file_name;
122
123 // Use exclusive-create mode ("x") to atomically fail if the file
124 // already exists, avoiding a separate fs::exists() stat call.
125 {
126 TimeBoundLogger log_if_slow(
127 fmt::format("Creating ledger file - fopen({})", file_path));
128 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
129 file = fopen(file_path.c_str(), "w+bx");
130 }
131 if (file == nullptr)
132 {
133 if (errno == EEXIST)
134 {
135 throw std::logic_error(fmt::format(
136 "Cannot create new ledger file {} in main ledger directory {} as "
137 "it already exists",
138 file_name,
139 dir));
140 }
141 throw std::logic_error(fmt::format(
142 "Unable to open ledger file {}: {}",
143 file_path,
144 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
145 }
146
147 // Header reserved for the offset to the position table
148 fseeko(file, sizeof(positions_offset_header_t), SEEK_SET);
149 total_len = sizeof(positions_offset_header_t);
150 }
151
152 // Used when recovering an existing ledger file
154 const std::string& dir,
155 const std::string& file_name_,
156 bool from_existing_file_ = false) :
157 dir(dir),
158 file_name(file_name_),
159 from_existing_file(from_existing_file_)
160 {
161 auto file_path = (fs::path(dir) / fs::path(file_name));
162
163 committed = is_ledger_file_name_committed(file_name);
164 start_idx = get_start_idx_from_file_name(file_name);
165
166 const auto* const mode = committed ? "rb" : "r+b";
167
168 {
169 TimeBoundLogger log_if_slow(
170 fmt::format("Opening ledger file - fopen({})", file_path));
171 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
172 file = fopen(file_path.c_str(), mode);
173 }
174
175 if (file == nullptr)
176 {
177 throw std::logic_error(fmt::format(
178 "Unable to open ledger file {}: {}",
179 file_path,
180 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
181 }
182
183 // First, get full size of file
184 fseeko(file, 0, SEEK_END);
185 size_t total_file_size = ftello(file);
186
187 // Second, read offset to header table
188 fseeko(file, 0, SEEK_SET);
189 positions_offset_header_t table_offset = 0;
190 {
191 TimeBoundLogger log_if_slow(
192 fmt::format("Reading positions offset - fread({})", file_path));
193 if (
194 fread(&table_offset, sizeof(positions_offset_header_t), 1, file) != 1)
195 {
196 throw std::logic_error(fmt::format(
197 "Failed to read positions offset from ledger file {}", file_path));
198 }
199 }
200
201 if (committed && table_offset == 0)
202 {
203 throw std::logic_error(fmt::format(
204 "Committed ledger file {} cannot be read: invalid table offset (0)",
205 file_path));
206 }
207
208 total_len = sizeof(positions_offset_header_t);
209
210 if (from_existing_file)
211 {
212 // When recovering a file from persistence, do not recover entries to
213 // start with as these are expected to be written again at a later
214 // point.
215 return;
216 }
217
218 if (table_offset != 0)
219 {
220 // If the chunk was completed, read positions table from file directly
221 total_len = table_offset;
222 fseeko(file, table_offset, SEEK_SET);
223
224 if (table_offset > total_file_size)
225 {
226 throw std::logic_error(fmt::format(
227 "Invalid table offset {} greater than total file size {}",
228 table_offset,
229 total_file_size));
230 }
231
232 positions.resize(
233 (total_file_size - table_offset) / sizeof(positions.at(0)));
234
235 {
236 TimeBoundLogger log_if_slow(fmt::format(
237 "Reading positions table ({} entries) - fread({})",
238 positions.size(),
239 file_path));
240 if (
241 fread(
242 positions.data(),
243 sizeof(positions.at(0)),
244 positions.size(),
245 file) != positions.size())
246 {
247 throw std::logic_error(fmt::format(
248 "Failed to read positions table from ledger file {}", file_path));
249 }
250 }
251 completed = true;
252 }
253 else
254 {
255 // If the chunk was not completed, read all entries to reconstruct
256 // positions table
257 total_len = sizeof(positions_offset_header_t);
258 auto len = total_file_size - total_len;
259
260 TimeBoundLogger log_if_slow(fmt::format(
261 "Recovering entries from incomplete ledger file {} ({} bytes)",
262 file_path,
263 len));
264
265 ccf::kv::SerialisedEntryHeader entry_header = {};
266 size_t current_idx = start_idx;
267 while (len >= ccf::kv::serialised_entry_header_size)
268 {
269 if (
270 fread(
271 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
272 1)
273 {
275 "Failed to read entry header from ledger file {} at seqno {}",
276 file_path,
277 current_idx);
278 return;
279 }
280
281 len -= ccf::kv::serialised_entry_header_size;
282
283 const auto& entry_size = entry_header.size;
284 if (len < entry_size)
285 {
287 "Malformed incomplete ledger file {} at seqno {} (expecting "
288 "entry of size "
289 "{}, remaining {})",
290 file_path,
291 current_idx,
292 entry_size,
293 len);
294
295 return;
296 }
297
298 fseeko(file, entry_size, SEEK_CUR);
299 len -= entry_size;
300
302 "Recovered one entry of size {} at seqno {}",
303 entry_size,
304 current_idx);
305
306 current_idx++;
307 positions.push_back(total_len);
308 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
309 }
310 completed = false;
311 }
312 }
313
315 {
316 if (file != nullptr)
317 {
318 TimeBoundLogger log_if_slow(
319 fmt::format("Closing ledger file - fclose({})", file_name));
320 std::ignore =
321 fclose(file); // NOLINT(cppcoreguidelines-owning-memory,cert-err33-c)
322 }
323 }
324
325 [[nodiscard]] size_t get_start_idx() const
326 {
327 return start_idx;
328 }
329
330 [[nodiscard]] size_t get_last_idx() const
331 {
332 return start_idx + positions.size() - 1;
333 }
334
335 [[nodiscard]] size_t get_current_size() const
336 {
337 return total_len;
338 }
339
340 [[nodiscard]] bool is_committed() const
341 {
342 return committed;
343 }
344
345 [[nodiscard]] bool is_complete() const
346 {
347 return completed;
348 }
349
350 [[nodiscard]] bool is_recovery() const
351 {
352 return recovery;
353 }
354
355 // Returns idx of new entry, and boolean to indicate that file was truncated
356 // before writing the entry
357 std::pair<size_t, bool> write_entry(
358 const uint8_t* data, size_t size, bool committable)
359 {
360 fseeko(file, total_len, SEEK_SET);
361
362 bool should_write = true;
363 bool has_truncated = false;
364 if (from_existing_file)
365 {
366 std::vector<uint8_t> entry(size);
367 bool read_mismatch = false;
368 {
369 TimeBoundLogger log_if_slow(fmt::format(
370 "Reading existing entry for comparison ({} bytes) - fread({})",
371 size,
372 file_name));
373 read_mismatch = fread(entry.data(), size, 1, file) != 1 ||
374 memcmp(entry.data(), data, size) != 0;
375 }
376 if (read_mismatch)
377 {
378 // Divergence between existing and new entry. Truncate this file,
379 // write the new entry and notify the caller for further cleanup.
380 // Note that even if the truncation results in an empty file, we keep
381 // it on disk as a new entry is about to be written.
382 truncate(get_last_idx(), false /* remove_file_if_empty */);
383 has_truncated = true;
384 from_existing_file = false;
385 }
386 else
387 {
388 should_write = false;
389 }
390 }
391
392 if (should_write)
393 {
394 {
395 TimeBoundLogger log_if_slow(fmt::format(
396 "Writing ledger entry ({} bytes) - fwrite({})", size, file_name));
397 if (fwrite(data, size, 1, file) != 1)
398 {
399 throw std::logic_error("Failed to write entry to ledger");
400 }
401 }
402
403 // Committable entries get flushed straight away
404 if (committable)
405 {
406 TimeBoundLogger log_if_slow(
407 fmt::format("Flushing ledger entry - fflush({})", file_name));
408 if (fflush(file) != 0)
409 {
410 throw std::logic_error(fmt::format(
411 "Failed to flush entry to ledger: {}",
412 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
413 }
414 }
415 }
416
417 positions.push_back(total_len);
418 total_len += size;
419
420 return std::make_pair(get_last_idx(), has_truncated);
421 }
422
423 // Return pair containing entries size and index of last entry included
424 [[nodiscard]] std::pair<size_t, size_t> entries_size(
425 size_t from,
426 size_t to,
427 std::optional<size_t> max_size = std::nullopt) const
428 {
429 if ((from < start_idx) || (to < from) || (to > get_last_idx()))
430 {
431 return {0, 0};
432 }
433
434 size_t size = 0;
435
436 // If max_size is set, return entries that fit within it (best effort).
437 while (true)
438 {
439 auto position_to =
440 (to == get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
441 size = position_to - positions.at(from - start_idx);
442
443 if (!max_size.has_value() || size <= max_size.value())
444 {
445 break;
446 }
447
448 if (from == to)
449 {
450 // Request one entry that is too large: no entries are found
452 "Single ledger entry at {} in file {} is too large for remaining "
453 "space (size {} > max {})",
454 from,
455 file_name,
456 size,
457 max_size.value());
458 return {0, 0};
459 }
460 size_t to_ = from + (to - from) / 2;
462 "Requesting ledger entries from {} to {} in file {} but size {} > "
463 "max size {}: now requesting up to {}",
464 from,
465 to,
466 file_name,
467 size,
468 max_size.value(),
469 to_);
470 to = to_;
471 }
472
473 return {size, to};
474 }
475
476 std::optional<LedgerReadResult> read_entries(
477 size_t from, size_t to, std::optional<size_t> max_size = std::nullopt)
478 {
479 if ((from < start_idx) || (to > get_last_idx()) || (to < from))
480 {
482 "Cannot find entries: {} - {} in ledger file {}",
483 from,
484 to,
485 file_name);
486 return std::nullopt;
487 }
488
490 "Read entries from {} to {} in {} [max size: {}]",
491 from,
492 to,
493 file_name,
494 max_size.value_or(0));
495
496 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
497 auto [size, to_] = entries_size(from, to, max_size);
498 if (size == 0)
499 {
500 return std::nullopt;
501 }
502 std::vector<uint8_t> entries(size);
503 fseeko(file, positions.at(from - start_idx), SEEK_SET);
504
505 {
506 TimeBoundLogger log_if_slow(fmt::format(
507 "Reading ledger entries {} to {} ({} bytes) - fread({})",
508 from,
509 to_,
510 size,
511 file_name));
512 if (fread(entries.data(), size, 1, file) != 1)
513 {
514 throw std::logic_error(fmt::format(
515 "Failed to read entry range {} - {} from file {}",
516 from,
517 to,
518 file_name));
519 }
520 }
521
522 return LedgerReadResult{entries, to_};
523 }
524
525 bool truncate(size_t idx, bool remove_file_if_empty = true)
526 {
527 if (
528 committed || (idx < start_idx - 1) ||
529 (completed && idx >= get_last_idx()))
530 {
531 return false;
532 }
533
534 if (remove_file_if_empty && idx == start_idx - 1)
535 {
536 // Truncating everything triggers file deletion
537 {
538 TimeBoundLogger log_if_slow(fmt::format(
539 "Removing ledger file on truncation - remove({})", file_name));
540 if (!fs::remove(dir / file_name))
541 {
542 throw std::logic_error(
543 fmt::format("Could not remove file {}", file_name));
544 }
545 }
547 "Removed ledger file {} on truncation at {}", file_name, idx);
548 return true;
549 }
550
551 // Reset positions offset header
552 fseeko(file, 0, SEEK_SET);
553 positions_offset_header_t table_offset = 0;
554 {
555 TimeBoundLogger log_if_slow(
556 fmt::format("Resetting positions offset - fwrite({})", file_name));
557 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
558 {
559 throw std::logic_error("Failed to reset positions table offset");
560 }
561 }
562
563 completed = false;
564 if (idx != get_last_idx())
565 {
566 total_len = positions.at(idx - start_idx + 1);
567 positions.resize(idx - start_idx + 1);
568 }
569
570 {
571 TimeBoundLogger log_if_slow(
572 fmt::format("Flushing truncated ledger - fflush({})", file_name));
573 if (fflush(file) != 0)
574 {
575 throw std::logic_error(fmt::format(
576 "Failed to flush ledger file: {}",
577 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
578 }
579 }
580
581 {
582 TimeBoundLogger log_if_slow(
583 fmt::format("Truncating ledger file - ftruncate({})", file_name));
584 if (ftruncate(fileno(file), total_len) != 0)
585 {
586 throw std::logic_error(fmt::format(
587 "Failed to truncate ledger: {}",
588 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
589 }
590 }
591
592 fseeko(file, total_len, SEEK_SET);
593 LOG_TRACE_FMT("Truncated ledger file {} at seqno {}", file_name, idx);
594 return false;
595 }
596
597 void complete()
598 {
599 if (completed)
600 {
601 return;
602 }
603 // It may happen (e.g. during recovery) that the incomplete ledger gets
604 // truncated on the primary, so we have to make sure that whenever we
605 // complete the file it doesn't contain anything past the last_idx, which
606 // can happen on the follower unless explicitly truncated before
607 // completion. This is only necessary when the file was recovered from an
608 // existing file on disk (from_existing_file is true). For fresh files,
609 // total_len always matches the physical file size, so avoid a potentially
610 // expensive truncate.
611 if (from_existing_file)
612 {
613 truncate(get_last_idx(), /* remove_file_if_empty = */ false);
614 }
615
616 fseeko(file, total_len, SEEK_SET);
617 size_t table_offset = ftello(file);
618
619 {
620 TimeBoundLogger log_if_slow(fmt::format(
621 "Writing positions table ({} entries) - fwrite({})",
622 positions.size(),
623 file_name));
624 if (
625 fwrite(
626 reinterpret_cast<uint8_t*>(positions.data()),
627 sizeof(positions.at(0)),
628 positions.size(),
629 file) != positions.size())
630 {
631 throw std::logic_error("Failed to write positions table to ledger");
632 }
633 }
634
635 // Write positions table offset at start of file
636 if (fseeko(file, 0, SEEK_SET) != 0)
637 {
638 throw std::logic_error("Failed to set file offset to 0");
639 }
640
641 {
642 TimeBoundLogger log_if_slow(fmt::format(
643 "Writing positions table offset - fwrite({})", file_name));
644 if (fwrite(&table_offset, sizeof(table_offset), 1, file) != 1)
645 {
646 throw std::logic_error(
647 "Failed to write positions table offset to ledger");
648 }
649 }
650
651 {
652 TimeBoundLogger log_if_slow(
653 fmt::format("Completing ledger file - fflush({})", file_name));
654 if (fflush(file) != 0)
655 {
656 throw std::logic_error(fmt::format(
657 "Failed to flush ledger file: {}",
658 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
659 }
660 }
661
662 LOG_TRACE_FMT("Completed ledger file {}", file_name);
663
664 completed = true;
665 }
666
667 bool rename(const std::string& new_file_name)
668 {
669 auto file_path = dir / file_name;
670 auto new_file_path = dir / new_file_name;
671
672 try
673 {
674 TimeBoundLogger log_if_slow(fmt::format(
675 "Renaming ledger file {} to {} - rename()",
676 file_name,
677 new_file_name));
678 files::rename(file_path, new_file_path);
679 }
680 catch (const std::exception& e)
681 {
682 // If the file cannot be renamed (e.g. file was removed), report an
683 // error and continue
684 LOG_FAIL_FMT("Error renaming ledger file: {}", e.what());
685 }
686 file_name = new_file_name;
687 return true;
688 }
689
690 void open()
691 {
692 auto new_file_name = remove_recovery_suffix(file_name.c_str());
693 rename(new_file_name);
694 recovery = false;
695 LOG_DEBUG_FMT("Open recovery ledger file {}", new_file_name);
696 }
697
698 bool commit(size_t idx)
699 {
700 if (!completed || committed || (idx != get_last_idx()))
701 {
702 // No effect if commit idx is not last idx
703 return false;
704 }
705
706 // Files that are completed and committed are fsync'ed under lock
707 // (acquired in LedgerFiles::commit()) to ensure that any file returned by
708 // committed_ledger_path_with_idx() is complete and can be safely read and
709 // served to other nodes.
710 {
711 TimeBoundLogger log_if_slow(
712 fmt::format("Committing ledger file - fsync({})", file_name));
713 if (fsync(fileno(file)) != 0)
714 {
715 throw std::logic_error(fmt::format(
716 "Failed to flush ledger file: {}",
717 std::strerror(errno))); // NOLINT(concurrency-mt-unsafe)
718 }
719 }
720
721 auto committed_file_name = fmt::format(
722 "{}_{}-{}{}",
723 file_name_prefix,
724 start_idx,
725 get_last_idx(),
726 ledger_committed_suffix);
727
728 if (recovery)
729 {
730 committed_file_name =
731 fmt::format("{}{}", committed_file_name, ledger_recovery_file_suffix);
732 }
733
734 if (!rename(committed_file_name))
735 {
736 return false;
737 }
738
739 committed = true;
740 LOG_DEBUG_FMT("Committed ledger file {}", file_name);
741
742 // Committed recovery files stay in the list of active files until the
743 // ledger is open
744 return !recovery;
745 }
746 };
747
748 class Ledger
749 {
750 private:
751 ringbuffer::WriterPtr to_enclave;
752
753 // Main ledger directory (write and read)
754 const fs::path ledger_dir;
755
756 // Ledger directories (read-only)
757 const std::vector<fs::path> read_ledger_dirs;
758
759 ccf::pal::Mutex state_lock;
760
761 // Keep tracks of all ledger files for writing.
762 // Current ledger file is always the last one
763 std::list<std::shared_ptr<LedgerFile>> files;
764
765 // Cache of ledger files for reading
766 const size_t max_read_cache_files;
767 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
768 ccf::pal::Mutex read_cache_lock;
769
770 size_t last_idx = 0;
771 size_t committed_idx = 0;
772
773 size_t end_of_committed_files_idx = 0;
774
775 // Indicates if the ledger has been initialised at a specific idx and
776 // may still be replaying existing entries.
777 bool use_existing_files = false;
778 // Used to remember the last recovered idx on init so that
779 // use_existing_files can be disabled once this idx is passed
780 std::optional<size_t> last_idx_on_init = std::nullopt;
781
782 // Use to remember the index from which replication started
783 size_t init_idx = 0;
784
785 // Set during recovery to mark files as temporary until the recovery is
786 // complete
787 std::optional<size_t> recovery_start_idx = std::nullopt;
788
789 [[nodiscard]] auto get_it_contains_idx(size_t idx) const
790 {
791 if (idx == 0)
792 {
793 return files.end();
794 }
795
796 auto f = std::upper_bound(
797 files.begin(),
798 files.end(),
799 idx,
800 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
801 return (idx <= f->get_last_idx());
802 });
803
804 return f;
805 }
806
807 std::shared_ptr<LedgerFile> get_file_from_cache(size_t idx)
808 {
809 if (idx == 0)
810 {
811 return nullptr;
812 }
813
814 {
815 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
816
817 // First, try to find file from read cache
818 for (auto const& f : files_read_cache)
819 {
820 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
821 {
822 return f;
823 }
824 }
825 }
826
827 // If the file is not in the cache, find the file from the ledger
828 // directories, inspecting the main ledger directory first
829 // Note: reading recovery chunks from main ledger directory is
830 // acceptable and in fact required to complete private recovery.
831 std::string ledger_dir_;
832 auto match = get_file_name_with_idx(ledger_dir, idx, true);
833 if (match.has_value())
834 {
835 ledger_dir_ = ledger_dir;
836 }
837 else
838 {
839 for (auto const& dir : read_ledger_dirs)
840 {
841 match = get_file_name_with_idx(dir, idx, false);
842 if (match.has_value())
843 {
844 ledger_dir_ = dir;
845 break;
846 }
847 }
848 }
849
850 if (!match.has_value())
851 {
852 return nullptr;
853 }
854
855 // Emplace file in the max-sized read cache, replacing the oldest entry if
856 // the read cache is full
857 std::shared_ptr<LedgerFile> match_file = nullptr;
858 try
859 {
860 match_file = std::make_shared<LedgerFile>(
861 ledger_dir_, *match); // NOLINT(bugprone-unchecked-optional-access)
862 }
863 catch (const std::exception& e)
864 {
866 "Could not open ledger file {} to read seqno {}: {}",
867 match.value(),
868 idx,
869 e.what());
870 return nullptr;
871 }
872
873 {
874 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
875
876 files_read_cache.emplace_back(match_file);
877 if (files_read_cache.size() > max_read_cache_files)
878 {
879 files_read_cache.erase(files_read_cache.begin());
880 }
881 }
882
883 return match_file;
884 }
885
886 std::shared_ptr<LedgerFile> get_file_from_idx(
887 size_t idx, bool read_cache_only = false)
888 {
889 if (idx == 0)
890 {
891 return nullptr;
892 }
893
894 if (!read_cache_only)
895 {
896 // First, check if the file is in the list of files open for writing
897 auto f = std::upper_bound(
898 files.rbegin(),
899 files.rend(),
900 idx,
901 [](size_t idx, const std::shared_ptr<LedgerFile>& f) {
902 return idx >= f->get_start_idx();
903 });
904
905 if (f != files.rend())
906 {
907 return *f;
908 }
909 }
910
911 // Otherwise, return file from read cache
912 return get_file_from_cache(idx);
913 }
914
915 [[nodiscard]] std::shared_ptr<LedgerFile> get_latest_file(
916 bool incomplete_only = true) const
917 {
918 if (files.empty())
919 {
920 return nullptr;
921 }
922 const auto& last_file = files.back();
923 if (incomplete_only && last_file->is_complete())
924 {
925 return nullptr;
926 }
927
928 return last_file;
929 }
930
931 std::optional<LedgerReadResult> read_entries_range(
932 size_t from,
933 size_t to,
934 bool read_cache_only = false,
935 std::optional<size_t> max_entries_size = std::nullopt)
936 {
937 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
938
939 // Note: if max_entries_size is set, this returns contiguous ledger
940 // entries on a best effort basis, so that the returned entries fit in
941 // max_entries_size but without maximising the number of entries returned.
942 if ((from <= 0) || (to < from))
943 {
944 return std::nullopt;
945 }
946
947 // During recovery or other low-knowledge batch operations, we might
948 // request entries past the end of the ledger - truncate to the true end
949 // here.
950 if (to > last_idx)
951 {
952 to = last_idx;
953 }
954
956 rr.end_idx = to;
957
958 size_t idx = from;
959 while (idx <= to)
960 {
961 auto f_from = get_file_from_idx(idx, read_cache_only);
962 if (f_from == nullptr)
963 {
964 LOG_FAIL_FMT("Cannot find ledger file for seqno {}", idx);
965 return std::nullopt;
966 }
967 auto to_ = std::min(f_from->get_last_idx(), to);
968 std::optional<size_t> max_size = std::nullopt;
969 if (max_entries_size.has_value())
970 {
971 max_size = max_entries_size.value() - rr.data.size();
972 }
973 auto v = f_from->read_entries(idx, to_, max_size);
974 if (!v.has_value())
975 {
976 break;
977 }
978 rr.end_idx = v->end_idx;
979 rr.data.insert(
980 rr.data.end(),
981 std::make_move_iterator(v->data.begin()),
982 std::make_move_iterator(v->data.end()));
983 if (v->end_idx != to_)
984 {
985 // If all the entries requested from a file are not returned (i.e.
986 // because the requested entries are larger than max_entries_size),
987 // return immediately to avoid returning non-contiguous entries from a
988 // subsequent ledger file.
989 break;
990 }
991 idx = to_ + 1;
992 }
993
994 if (!rr.data.empty())
995 {
996 return rr;
997 }
998
999 return std::nullopt;
1000 }
1001
1002 void ignore_ledger_file(const std::string& file_name)
1003 {
1004 if (is_ledger_file_name_ignored(file_name))
1005 {
1006 return;
1007 }
1008
1009 auto ignored_file_name =
1010 fmt::format("{}{}", file_name, ledger_ignored_file_suffix);
1011 {
1012 TimeBoundLogger log_if_slow(fmt::format(
1013 "Ignoring ledger file - rename({} to {})",
1014 file_name,
1015 ignored_file_name));
1016 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
1017 }
1018 }
1019
1020 void delete_ledger_files_after_idx(size_t idx)
1021 {
1022 // Use with caution! Delete all ledger files later than idx
1023 for (auto const& f : fs::directory_iterator(ledger_dir))
1024 {
1025 auto file_name = f.path().filename();
1026 auto start_idx = get_start_idx_from_file_name(file_name);
1027 if (start_idx > idx)
1028 {
1029 TimeBoundLogger log_if_slow(fmt::format(
1030 "Deleting divergent ledger file - remove({})", file_name));
1031 if (!fs::remove(ledger_dir / file_name))
1032 {
1033 throw std::logic_error(
1034 fmt::format("Could not remove file {}", file_name));
1035 }
1037 "Forcing removal of ledger file {} as start idx {} > {}",
1038 file_name,
1039 start_idx,
1040 idx);
1041 }
1042 }
1043 }
1044
1045 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(size_t idx)
1046 {
1047 if (!use_existing_files)
1048 {
1049 return nullptr;
1050 }
1051
1052 for (auto const& f : fs::directory_iterator(ledger_dir))
1053 {
1054 auto file_name = f.path().filename();
1055 if (
1056 idx == get_start_idx_from_file_name(file_name) &&
1057 !is_ledger_file_ignored(file_name))
1058 {
1059 return std::make_shared<LedgerFile>(
1060 ledger_dir, file_name, true /* from_existing_file */);
1061
1062 break;
1063 }
1064 }
1065
1066 return nullptr;
1067 }
1068
1069 public:
1071 const fs::path& ledger_dir,
1072 ringbuffer::AbstractWriterFactory& writer_factory,
1073 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1074 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1075 to_enclave(writer_factory.create_writer_to_inside()),
1076 ledger_dir(ledger_dir),
1077 read_ledger_dirs(read_ledger_dirs_.begin(), read_ledger_dirs_.end()),
1078 max_read_cache_files(max_read_cache_files)
1079 {
1080 // Recover last idx from read-only ledger directories
1081 for (const auto& read_dir : read_ledger_dirs_)
1082 {
1083 LOG_INFO_FMT("Recovering read-only ledger directory \"{}\"", read_dir);
1084 if (!fs::is_directory(read_dir))
1085 {
1086 throw std::logic_error(
1087 fmt::format("{} read-only ledger is not a directory", read_dir));
1088 }
1089
1090 for (auto const& f : fs::directory_iterator(read_dir))
1091 {
1092 auto file_name = f.path().filename();
1093 auto last_idx_ = get_last_idx_from_file_name(file_name);
1094 if (
1095 !last_idx_.has_value() ||
1096 !is_ledger_file_name_committed(file_name) ||
1097 is_ledger_file_name_ignored(file_name))
1098 {
1100 "Read-only ledger file {} is ignored as not committed",
1101 file_name);
1102 continue;
1103 }
1104
1105 if (last_idx_.value() > last_idx)
1106 {
1107 last_idx = last_idx_.value();
1108 committed_idx = last_idx;
1109 end_of_committed_files_idx = last_idx;
1110 }
1111
1113 "Recovering file from read-only ledger directory: {}", file_name);
1114 }
1115 }
1116
1117 if (last_idx > 0)
1118 {
1120 "Recovered read-only ledger directories up to {}, committed up to "
1121 "{} ",
1122 last_idx,
1123 committed_idx);
1124 }
1125
1126 if (fs::is_directory(ledger_dir))
1127 {
1128 // If the ledger directory exists, populate this->files with the
1129 // writeable files from it. These must have no suffix, and must not
1130 // end-before the current committed_idx found from the read-only
1131 // directories
1132 LOG_INFO_FMT("Recovering main ledger directory {}", ledger_dir);
1133
1134 for (auto const& f : fs::directory_iterator(ledger_dir))
1135 {
1136 auto file_name = f.path().filename();
1137
1138 if (is_ledger_file_ignored(file_name))
1139 {
1141 "Ignoring ledger file {} in main ledger directory", file_name);
1142
1143 ignore_ledger_file(file_name);
1144
1145 continue;
1146 }
1147
1148 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1149
1150 if (is_ledger_file_name_committed(file_name))
1151 {
1152 if (!file_end_idx.has_value())
1153 {
1155 "Unexpected file {} in {}: committed but not completed",
1156 file_name,
1157 ledger_dir);
1158 }
1159 else
1160 {
1161 if (file_end_idx.value() > committed_idx)
1162 {
1163 committed_idx = file_end_idx.value();
1164 end_of_committed_files_idx = file_end_idx.value();
1165 }
1166 }
1167
1168 continue;
1169 }
1170
1171 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1172 {
1174 "Ignoring ledger file {} in main ledger directory - already "
1175 "discovered commit up to {} from read-only directories",
1176 file_name,
1177 committed_idx);
1178
1179 ignore_ledger_file(file_name);
1180
1181 continue;
1182 }
1183
1184 std::shared_ptr<LedgerFile> ledger_file = nullptr;
1185 try
1186 {
1187 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1188
1189 // Truncate file to latest recovered index to cleanup entries that
1190 // may have been corrupted (no-op if file isn't corrupted)
1191 if (ledger_file->truncate(ledger_file->get_last_idx()))
1192 {
1193 // If truncation of corrupted entries removes file, file is not
1194 // recovered
1195 LOG_FAIL_FMT("Removed ledger file {}", file_name);
1196 continue;
1197 }
1198 }
1199 catch (const std::exception& e)
1200 {
1202 "Error reading ledger file {}: {}", file_name, e.what());
1203 // Ignore file if it cannot be recovered.
1204 ignore_ledger_file(file_name);
1205 continue;
1206 }
1207
1209 "Recovering file from main ledger directory: {}", file_name);
1210 files.emplace_back(std::move(ledger_file));
1211 }
1212
1213 if (files.empty())
1214 {
1216 "Main ledger directory {} is empty: no ledger file to "
1217 "recover",
1218 ledger_dir);
1219
1220 // If we had any uncommitted files, we wouldn't be in this path and
1221 // we'd populate last_idx below. In this branch, we need to ensure
1222 // last_idx is correctly initialised. Since there are no uncommitted
1223 // files, it must match the last committed_idx we've discovered.
1224 last_idx = committed_idx;
1225 return;
1226 }
1227
1229 "Main ledger directory {} contains {} restored (writeable) files",
1230 ledger_dir,
1231 files.size());
1232
1233 files.sort([](
1234 const std::shared_ptr<LedgerFile>& a,
1235 const std::shared_ptr<LedgerFile>& b) {
1236 return a->get_last_idx() < b->get_last_idx();
1237 });
1238
1239 const auto main_ledger_dir_last_idx =
1240 get_latest_file(false)->get_last_idx();
1241 if (main_ledger_dir_last_idx > last_idx)
1242 {
1243 last_idx = main_ledger_dir_last_idx;
1244 }
1245 }
1246 else
1247 {
1248 TimeBoundLogger log_if_slow(fmt::format(
1249 "Creating ledger directory - create_directory({})", ledger_dir));
1250 if (!fs::create_directory(ledger_dir))
1251 {
1252 throw std::logic_error(fmt::format(
1253 "Error: Could not create ledger directory: {}", ledger_dir));
1254 }
1255 }
1256
1258 "Recovered ledger entries up to {}, committed to {}",
1259 last_idx,
1260 committed_idx);
1261 }
1262
1263 Ledger(const Ledger& that) = delete;
1264
1265 void init(size_t idx, size_t recovery_start_idx_ = 0)
1266 {
1267 TimeBoundLogger log_if_slow(
1268 fmt::format("Initing ledger - seqno={}", idx));
1269
1270 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1271
1272 init_idx = idx;
1273
1274 // Used by backup nodes to initialise the ledger when starting from a
1275 // non-empty state, i.e. snapshot. It is assumed that idx is included in a
1276 // committed ledger file.
1277
1278 // To restart from a snapshot cleanly, in the main ledger directory,
1279 // mark all subsequent ledger as non-committed as their contents will be
1280 // replayed.
1281 for (auto const& f : fs::directory_iterator(ledger_dir))
1282 {
1283 auto file_name = f.path().filename();
1284 if (
1285 is_ledger_file_name_committed(file_name) &&
1286 (get_start_idx_from_file_name(file_name) > idx))
1287 {
1288 auto last_idx_file = get_last_idx_from_file_name(file_name);
1289 if (!last_idx_file.has_value())
1290 {
1291 throw std::logic_error(fmt::format(
1292 "Committed ledger file {} does not include last idx in file name",
1293 file_name));
1294 }
1295
1297 "Remove committed suffix from ledger file {} after init at {}: "
1298 "last_idx {}",
1299 file_name,
1300 idx,
1301 last_idx_file.value());
1302
1303 {
1304 TimeBoundLogger log_rename_if_slow(
1305 fmt::format("Removing committed suffix - rename({})", file_name));
1306 files::rename(
1307 ledger_dir / file_name,
1308 ledger_dir /
1309 remove_suffix(
1310 file_name.string(),
1311 fmt::format(
1312 "{}{}{}",
1313 ledger_last_idx_delimiter,
1314 last_idx_file.value(),
1315 ledger_committed_suffix)));
1316 }
1317 }
1318 }
1319
1320 // Close all open write files as the ledger should
1321 // restart cleanly, from a new chunk.
1322 files.clear();
1323
1324 use_existing_files = true;
1325 last_idx_on_init = last_idx;
1326 last_idx = idx;
1327 committed_idx = idx;
1328 if (recovery_start_idx_ > 0)
1329 {
1330 // Do not set recovery idx and create recovery chunks
1331 // if the ledger is initialised from 0 (i.e. genesis)
1332 recovery_start_idx = recovery_start_idx_;
1333 }
1334
1336 "Set last known/commit seqno to {}, recovery seqno to {}",
1337 idx,
1338 recovery_start_idx_);
1339 }
1340
1342 {
1343 // When the recovery is completed (i.e. service is open), temporary
1344 // recovery ledger chunks are renamed as they can now be recovered.
1345 // Note: this operation cannot be rolled back.
1346 LOG_INFO_FMT("Ledger complete recovery");
1347
1348 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1349
1350 for (auto it = files.begin(); it != files.end();)
1351 {
1352 auto& f = *it;
1353 if (f->is_recovery())
1354 {
1355 f->open();
1356
1357 // Recovery files are kept in the list of active files when committed
1358 // so that they can be renamed in a stable order when the service is
1359 // open. Once this is done, they can be removed from the list of
1360 // active files.
1361 if (f->is_committed())
1362 {
1363 it = files.erase(it);
1364 continue;
1365 }
1366 }
1367 ++it;
1368 }
1369
1370 recovery_start_idx.reset();
1371 }
1372
1373 [[nodiscard]] size_t get_last_idx()
1374 {
1375 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1376
1377 return last_idx;
1378 }
1379
1380 void set_recovery_start_idx(size_t idx)
1381 {
1382 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1383
1384 recovery_start_idx = idx;
1385 }
1386
1387 std::optional<LedgerReadResult> read_entry(size_t idx)
1388 {
1389 TimeBoundLogger log_if_slow(
1390 fmt::format("Reading ledger entry at {}", idx));
1391
1392 // Locking is done in read_entries_range
1393
1394 return read_entries_range(idx, idx);
1395 }
1396
1397 std::optional<LedgerReadResult> read_entries(
1398 size_t from,
1399 size_t to,
1400 std::optional<size_t> max_entries_size = std::nullopt)
1401 {
1402 TimeBoundLogger log_if_slow(
1403 fmt::format("Reading ledger entries from {} to {}", from, to));
1404
1405 // Locking is done in read_entries_range
1406
1407 return read_entries_range(from, to, false, max_entries_size);
1408 }
1409
1410 size_t write_entry(const uint8_t* data, size_t size, bool committable)
1411 {
1412 TimeBoundLogger log_if_slow(fmt::format(
1413 "Writing ledger entry - {} bytes, committable={}", size, committable));
1414
1415 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1416
1417 auto header =
1418 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1419
1420 if ((header.flags & ccf::kv::EntryFlags::FORCE_LEDGER_CHUNK_BEFORE) != 0)
1421 {
1423 "Forcing ledger chunk before entry as required by the entry header "
1424 "flags");
1425
1426 auto file = get_latest_file();
1427 if (file != nullptr)
1428 {
1429 file->complete();
1430 LOG_DEBUG_FMT("Ledger chunk completed at {}", file->get_last_idx());
1431 }
1432 }
1433
1434 bool force_chunk_after =
1436 if (force_chunk_after)
1437 {
1438 if (!committable)
1439 {
1440 throw std::logic_error(
1441 "Ledger chunks cannot end in a non-committable transaction");
1442 }
1444 "Forcing ledger chunk after entry as required by the entry header "
1445 "flags");
1446 }
1447
1448 auto file = get_latest_file();
1449 if (file == nullptr)
1450 {
1451 // If no file is currently open for writing, create a new one
1452 size_t start_idx = last_idx + 1;
1453 if (use_existing_files)
1454 {
1455 // When recovering files from persistence, try to find one on disk
1456 // first
1457 file = get_existing_ledger_file_for_idx(start_idx);
1458 }
1459 if (file == nullptr)
1460 {
1461 bool is_recovery = recovery_start_idx.has_value() &&
1462 start_idx > recovery_start_idx.value();
1463 file =
1464 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1465 }
1466 files.emplace_back(file);
1467 }
1468 auto [last_idx_, has_truncated] =
1469 file->write_entry(data, size, committable);
1470 last_idx = last_idx_;
1471
1472 if (has_truncated)
1473 {
1474 // If a divergence was detected when writing the entry, delete all
1475 // further ledger files to cleanly continue
1476 LOG_INFO_FMT("Found divergent ledger entry at {}", last_idx);
1477 delete_ledger_files_after_idx(last_idx);
1478 use_existing_files = false;
1479 }
1480
1481 if (
1482 use_existing_files && last_idx_on_init.has_value() &&
1483 last_idx > last_idx_on_init.value())
1484 {
1485 use_existing_files = false;
1486 }
1487
1489 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1490 last_idx,
1491 committable,
1492 force_chunk_after);
1493
1494 if (committable && force_chunk_after)
1495 {
1496 file->complete();
1497 LOG_DEBUG_FMT("Ledger chunk completed at {}", last_idx);
1498 }
1499
1500 return last_idx;
1501 }
1502
1503 void truncate(size_t idx)
1504 {
1505 TimeBoundLogger log_if_slow(fmt::format("Truncating ledger at {}", idx));
1506
1507 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1508
1509 LOG_DEBUG_FMT("Ledger truncate: {}/{}", idx, last_idx);
1510
1511 // Conservative check to avoid truncating to future indices, or dropping
1512 // committed entries. If the ledger is being initialised from a snapshot
1513 // alone, the first truncation effectively sets the last index.
1514 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1515 {
1517 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1518 idx,
1519 last_idx,
1520 committed_idx);
1521 return;
1522 }
1523
1524 auto f_from = get_it_contains_idx(idx + 1);
1525 auto f_to = get_it_contains_idx(last_idx);
1526 // std::next(end()) is undefined behaviour, which libstdc++'s debug
1527 // iterators correctly detect; use end() directly when f_to is end().
1528 auto f_end = (f_to == files.end()) ? files.end() : std::next(f_to);
1529
1530 // Note: do not compare iterators against `f_from` inside the loop, as
1531 // it may be invalidated by `files.erase(it)` below. Use a flag for the
1532 // first iteration instead.
1533 bool is_first = true;
1534 for (auto it = f_from; it != f_end;)
1535 {
1536 // Truncate the first file to the truncation index while the more
1537 // recent files are deleted entirely
1538 auto truncate_idx = is_first ? idx : (*it)->get_start_idx() - 1;
1539 is_first = false;
1540 if ((*it)->truncate(truncate_idx))
1541 {
1542 it = files.erase(it);
1543 }
1544 else
1545 {
1546 it++;
1547 }
1548 }
1549
1550 last_idx = idx;
1551 }
1552
1553 void commit(size_t idx)
1554 {
1555 TimeBoundLogger log_if_slow(
1556 fmt::format("Committing ledger entry {}", idx));
1557
1558 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1559
1560 LOG_DEBUG_FMT("Ledger commit: {}/{}", idx, last_idx);
1561
1562 if (idx <= committed_idx || idx > last_idx)
1563 {
1564 return;
1565 }
1566
1567 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1568 get_it_contains_idx(committed_idx);
1569 auto f_to = get_it_contains_idx(idx);
1570 // std::next(end()) is undefined behaviour, which libstdc++'s debug
1571 // iterators correctly detect; use end() directly when f_to is end().
1572 auto f_end = (f_to == files.end()) ? files.end() : std::next(f_to);
1573
1574 for (auto it = f_from; it != f_end;)
1575 {
1576 // Commit all previous file to their latest index while the latest
1577 // file is committed to the committed index
1578 const auto last_idx_in_file = (*it)->get_last_idx();
1579 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1580 if (
1581 (*it)->commit(commit_idx) &&
1582 (it != f_to || (idx == last_idx_in_file)))
1583 {
1584 end_of_committed_files_idx = last_idx_in_file;
1585 it = files.erase(it);
1586 }
1587 else
1588 {
1589 it++;
1590 }
1591 }
1592
1593 committed_idx = idx;
1594 }
1595
1596 [[nodiscard]] bool is_in_committed_file(size_t idx)
1597 {
1598 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1599
1600 return idx <= end_of_committed_files_idx;
1601 }
1602
1608 [[nodiscard]] std::optional<fs::path> committed_ledger_path_with_idx(
1609 size_t idx)
1610 {
1611 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1612
1613 if (idx > end_of_committed_files_idx)
1614 {
1616 "Index {} is beyond end of committed files at {}, cannot get "
1617 "committed ledger path",
1618 idx,
1619 end_of_committed_files_idx);
1620 return std::nullopt;
1621 }
1622
1623 auto name = get_file_name_with_idx(
1624 ledger_dir, idx, false /* do not allow recovery files */);
1625
1626 if (!name.has_value())
1627 {
1629 "Could not find committed ledger file for index {} in {}",
1630 idx,
1631 ledger_dir);
1632 return std::nullopt;
1633 }
1634
1635 return ledger_dir / name.value();
1636 }
1637
1638 [[nodiscard]] size_t get_init_idx()
1639 {
1640 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1641
1642 return init_idx;
1643 }
1644
1646 {
1647 // Filled on construction
1649 size_t from_idx{};
1650 size_t to_idx{};
1651 size_t max_size{};
1652
1653 // First argument is ledger entries (or nullopt if not found)
1654 // Second argument is uv status code, which may indicate a cancellation
1656 std::function<void(std::optional<LedgerReadResult>&&, int)>;
1658
1659 // Final result
1660 std::optional<LedgerReadResult> read_result = std::nullopt;
1661 };
1662
1663 static void on_ledger_get_async(uv_work_t* req)
1664 {
1665 auto* data = static_cast<AsyncLedgerGet*>(req->data);
1666
1667 data->read_result = data->ledger->read_entries_range(
1668 data->from_idx, data->to_idx, true, data->max_size);
1669 }
1670
1671 static void on_ledger_get_async_complete(uv_work_t* req, int status)
1672 {
1673 auto* data = static_cast<AsyncLedgerGet*>(req->data);
1674
1675 data->result_cb(std::move(data->read_result), status);
1676
1677 delete data; // NOLINT(cppcoreguidelines-owning-memory)
1678 delete req; // NOLINT(cppcoreguidelines-owning-memory)
1679 }
1680
1682 size_t from_idx,
1683 size_t to_idx,
1684 std::optional<LedgerReadResult>&& read_result,
1686 {
1687 if (read_result.has_value())
1688 {
1690 ::consensus::ledger_entry_range,
1691 to_enclave,
1692 from_idx,
1693 read_result->end_idx,
1694 purpose,
1695 read_result->data);
1696 }
1697 else
1698 {
1700 ::consensus::ledger_no_entry_range,
1701 to_enclave,
1702 from_idx,
1703 to_idx,
1704 purpose);
1705 }
1706 }
1707
1710 {
1712 disp,
1713 ::consensus::ledger_init,
1714 [this](const uint8_t* data, size_t size) {
1715 auto idx = serialized::read<::consensus::Index>(data, size);
1716 auto recovery_start_index =
1717 serialized::read<::consensus::Index>(data, size);
1718 init(idx, recovery_start_index);
1719 });
1720
1722 disp,
1723 ::consensus::ledger_append,
1724 [this](const uint8_t* data, size_t size) {
1725 auto committable = serialized::read<bool>(data, size);
1726 write_entry(data, size, committable);
1727 });
1728
1730 disp,
1731 ::consensus::ledger_truncate,
1732 [this](const uint8_t* data, size_t size) {
1733 auto idx = serialized::read<::consensus::Index>(data, size);
1734 auto recovery_mode = serialized::read<bool>(data, size);
1735 truncate(idx);
1736 if (recovery_mode)
1737 {
1739 }
1740 });
1741
1743 disp,
1744 ::consensus::ledger_commit,
1745 [this](const uint8_t* data, size_t size) {
1746 auto idx = serialized::read<::consensus::Index>(data, size);
1747 commit(idx);
1748 });
1749
1751 disp, ::consensus::ledger_open, [this](const uint8_t*, size_t) {
1753 });
1754
1756 disp,
1757 ::consensus::ledger_get_range,
1758 [&](const uint8_t* data, size_t size) {
1759 auto [from_idx, to_idx, purpose] =
1760 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1761
1762 // Ledger entries response has metadata so cap total entries size
1763 // accordingly
1764 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1765 auto max_entries_size = to_enclave->get_max_message_size() -
1766 write_ledger_range_response_metadata_size;
1767
1768 if (is_in_committed_file(to_idx))
1769 {
1770 // Start an asynchronous job to do this, since it is committed and
1771 // can be accessed independently (and in parallel)
1772 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
1773 auto* work_handle = new uv_work_t;
1774
1775 {
1776 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
1777 auto* job = new AsyncLedgerGet;
1778 job->ledger = this;
1779 job->from_idx = from_idx;
1780 job->to_idx = to_idx;
1781 job->max_size = max_entries_size;
1782 job->result_cb = [this,
1783 from_idx_ = from_idx,
1784 to_idx_ = to_idx,
1785 purpose_ =
1786 purpose](auto&& read_result, int /*status*/) {
1787 // NB: Even if status is cancelled (and entry is empty), we
1788 // want to write this result back to the enclave
1790 from_idx_,
1791 to_idx_,
1792 std::forward<decltype(read_result)>(read_result),
1793 purpose_);
1794 };
1795
1796 work_handle->data = job;
1797 }
1798
1799 uv_queue_work(
1800 uv_default_loop(),
1801 work_handle,
1804 }
1805 else
1806 {
1807 // Read synchronously, since this accesses uncommitted state and
1808 // must accurately reflect changing files
1810 from_idx,
1811 to_idx,
1812 read_entries(from_idx, to_idx, max_entries_size),
1813 purpose);
1814 }
1815 });
1816 }
1817 };
1818}
Definition ledger.h:79
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:357
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:153
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:109
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:525
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:476
~LedgerFile()
Definition ledger.h:314
bool commit(size_t idx)
Definition ledger.h:698
bool is_committed() const
Definition ledger.h:340
void complete()
Definition ledger.h:597
bool is_recovery() const
Definition ledger.h:350
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:424
bool rename(const std::string &new_file_name)
Definition ledger.h:667
bool is_complete() const
Definition ledger.h:345
void open()
Definition ledger.h:690
size_t get_last_idx() const
Definition ledger.h:330
size_t get_current_size() const
Definition ledger.h:335
size_t get_start_idx() const
Definition ledger.h:325
Definition ledger.h:749
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1387
size_t get_init_idx()
Definition ledger.h:1638
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1070
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1397
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1410
void truncate(size_t idx)
Definition ledger.h:1503
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1671
void complete_recovery()
Definition ledger.h:1341
bool is_in_committed_file(size_t idx)
Definition ledger.h:1596
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1708
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1663
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1681
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1380
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1265
void commit(size_t idx)
Definition ledger.h:1553
size_t get_last_idx()
Definition ledger.h:1373
std::optional< fs::path > committed_ledger_path_with_idx(size_t idx)
Definition ledger.h:1608
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
Definition after_io.h:8
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
Definition files.h:20
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition ledger.h:73
std::vector< uint8_t > data
Definition ledger.h:74
size_t end_idx
Definition ledger.h:75
Definition ledger.h:1646
std::optional< LedgerReadResult > read_result
Definition ledger.h:1660
Ledger * ledger
Definition ledger.h:1648
size_t max_size
Definition ledger.h:1651
ResultCallback result_cb
Definition ledger.h:1657
size_t from_idx
Definition ledger.h:1649
size_t to_idx
Definition ledger.h:1650
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1656
Definition time_bound_logger.h:14
Definition serialised_entry_format.h:21
uint64_t size
Definition serialised_entry_format.h:28