27namespace fs = std::filesystem;
31 static constexpr size_t ledger_max_read_cache_files_default = 5;
33 static constexpr auto ledger_committed_suffix =
"committed";
34 static constexpr auto ledger_start_idx_delimiter =
"_";
35 static constexpr auto ledger_last_idx_delimiter =
"-";
36 static constexpr auto ledger_recovery_file_suffix =
"recovery";
37 static constexpr auto ledger_ignored_file_suffix =
"ignored";
39 static inline size_t get_start_idx_from_file_name(
40 const std::string& file_name)
42 auto pos = file_name.find(ledger_start_idx_delimiter);
43 if (pos == std::string::npos)
45 throw std::logic_error(fmt::format(
46 "Ledger file name {} does not contain a start seqno", file_name));
49 return std::stol(file_name.substr(pos + 1));
52 static inline std::optional<size_t> get_last_idx_from_file_name(
53 const std::string& file_name)
55 auto pos = file_name.find(ledger_last_idx_delimiter);
56 if (pos == std::string::npos)
62 return std::stol(file_name.substr(pos + 1));
65 static inline bool is_ledger_file_name_committed(
const std::string& file_name)
67 return file_name.ends_with(ledger_committed_suffix);
70 static inline bool is_ledger_file_name_recovery(
const std::string& file_name)
72 return file_name.ends_with(ledger_recovery_file_suffix);
75 static inline bool is_ledger_file_name_ignored(
const std::string& file_name)
77 return file_name.ends_with(ledger_ignored_file_suffix);
80 static inline bool is_ledger_file_ignored(
const std::string& file_name)
83 return is_ledger_file_name_recovery(file_name) ||
84 is_ledger_file_name_ignored(file_name);
87 static inline fs::path remove_suffix(
88 std::string_view file_name,
const std::string& suffix)
90 if (file_name.ends_with(suffix))
92 file_name.remove_suffix(suffix.size());
97 static inline fs::path remove_recovery_suffix(std::string_view file_name)
100 file_name, fmt::format(
".{}", ledger_recovery_file_suffix));
103 static std::optional<std::string> get_file_name_with_idx(
104 const std::string& dir,
size_t idx,
bool allow_recovery_files)
106 std::optional<std::string> match = std::nullopt;
107 for (
auto const& f : fs::directory_iterator(dir))
111 auto f_name = f.path().filename();
113 is_ledger_file_name_ignored(f_name) ||
114 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
119 size_t start_idx = 0;
120 std::optional<size_t> last_idx = std::nullopt;
123 start_idx = get_start_idx_from_file_name(f_name);
124 last_idx = get_last_idx_from_file_name(f_name);
126 catch (
const std::exception& e)
131 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
150 using positions_offset_header_t = size_t;
151 static constexpr auto file_name_prefix =
"ledger";
158 FILE* file =
nullptr;
161 size_t start_idx = 1;
162 size_t total_len = 0;
163 std::vector<uint32_t> positions;
165 bool completed =
false;
166 bool committed =
false;
168 bool recovery =
false;
174 bool from_existing_file =
false;
178 LedgerFile(
const fs::path& dir,
size_t start_idx,
bool recovery =
false) :
180 file_name(fmt::format(
"{}_{}", file_name_prefix, start_idx)),
181 start_idx(start_idx),
187 fmt::format(
"{}.{}", file_name.string(), ledger_recovery_file_suffix);
190 auto file_path = dir / file_name;
191 if (fs::exists(file_path))
193 throw std::logic_error(fmt::format(
194 "Cannot create new ledger file {} in main ledger directory {} as it "
199 file = fopen(file_path.c_str(),
"w+b");
202 throw std::logic_error(fmt::format(
203 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
207 fseeko(file,
sizeof(positions_offset_header_t), SEEK_SET);
208 total_len =
sizeof(positions_offset_header_t);
213 const std::string& dir,
214 const std::string& file_name_,
215 bool from_existing_file_ =
false) :
217 file_name(file_name_),
219 from_existing_file(from_existing_file_)
221 auto file_path = (fs::path(dir) / fs::path(file_name));
222 file = fopen(file_path.c_str(),
"r+b");
225 throw std::logic_error(fmt::format(
226 "Unable to open ledger file {}: {}", file_path, strerror(errno)));
229 committed = is_ledger_file_name_committed(file_name);
230 start_idx = get_start_idx_from_file_name(file_name);
233 fseeko(file, 0, SEEK_END);
234 size_t total_file_size = ftello(file);
237 fseeko(file, 0, SEEK_SET);
238 positions_offset_header_t table_offset = 0;
239 if (fread(&table_offset,
sizeof(positions_offset_header_t), 1, file) != 1)
241 throw std::logic_error(fmt::format(
242 "Failed to read positions offset from ledger file {}", file_path));
245 total_len =
sizeof(positions_offset_header_t);
247 if (from_existing_file)
255 if (table_offset != 0)
258 total_len = table_offset;
259 fseeko(file, table_offset, SEEK_SET);
261 if (table_offset > total_file_size)
263 throw std::logic_error(fmt::format(
264 "Invalid table offset {} greater than total file size {}",
270 (total_file_size - table_offset) /
sizeof(positions.at(0)));
275 sizeof(positions.at(0)),
277 file) != positions.size())
279 throw std::logic_error(fmt::format(
280 "Failed to read positions table from ledger file {}", file_path));
288 total_len =
sizeof(positions_offset_header_t);
289 auto len = total_file_size - total_len;
292 size_t current_idx = start_idx;
293 while (len >= ccf::kv::serialised_entry_header_size)
297 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
301 "Failed to read entry header from ledger file {} at seqno {}",
307 len -= ccf::kv::serialised_entry_header_size;
309 const auto& entry_size = entry_header.
size;
310 if (len < entry_size)
313 "Malformed incomplete ledger file {} at seqno {} (expecting "
324 fseeko(file, entry_size, SEEK_CUR);
328 "Recovered one entry of size {} at seqno {}",
333 positions.push_back(total_len);
334 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
355 return start_idx + positions.size() - 1;
381 const uint8_t* data,
size_t size,
bool committable)
383 fseeko(file, total_len, SEEK_SET);
385 bool should_write =
true;
386 bool has_truncated =
false;
387 if (from_existing_file)
389 std::vector<uint8_t> entry(size);
391 fread(entry.data(), size, 1, file) != 1 ||
392 memcmp(entry.data(), data, size) != 0)
399 has_truncated =
true;
400 from_existing_file =
false;
404 should_write =
false;
410 if (fwrite(data, size, 1, file) != 1)
412 throw std::logic_error(
"Failed to write entry to ledger");
416 if (committable && fflush(file) != 0)
418 throw std::logic_error(fmt::format(
419 "Failed to flush entry to ledger: {}", strerror(errno)));
423 positions.push_back(total_len);
433 std::optional<size_t> max_size = std::nullopt)
const
435 if ((from < start_idx) || (to < from) || (to >
get_last_idx()))
446 (to ==
get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
447 size = position_to - positions.at(from - start_idx);
449 if (!max_size.has_value() || size <= max_size.value())
459 "Single ledger entry at {} in file {} is too large for remaining "
460 "space (size {} > max {})",
467 size_t to_ = from + (to - from) / 2;
469 "Requesting ledger entries from {} to {} in file {} but size {} > "
470 "max size {}: now requesting up to {}",
485 size_t from,
size_t to, std::optional<size_t> max_size = std::nullopt)
487 if ((from < start_idx) || (to >
get_last_idx()) || (to < from))
490 "Cannot find entries: {} - {} in ledger file {}",
498 "Read entries from {} to {} in {} [max size: {}]",
502 max_size.value_or(0));
504 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
510 std::vector<uint8_t> entries(size);
511 fseeko(file, positions.at(from - start_idx), SEEK_SET);
513 if (fread(entries.data(), size, 1, file) != 1)
515 throw std::logic_error(fmt::format(
516 "Failed to read entry range {} - {} from file {}",
525 bool truncate(
size_t idx,
bool remove_file_if_empty =
true)
528 committed || (idx < start_idx - 1) ||
534 if (remove_file_if_empty && idx == start_idx - 1)
537 if (!fs::remove(dir / file_name))
539 throw std::logic_error(
540 fmt::format(
"Could not remove file {}", file_name));
543 "Removed ledger file {} on truncation at {}", file_name, idx);
548 fseeko(file, 0, SEEK_SET);
549 positions_offset_header_t table_offset = 0;
550 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
552 throw std::logic_error(
"Failed to reset positions table offset");
558 total_len = positions.at(idx - start_idx + 1);
559 positions.resize(idx - start_idx + 1);
562 if (fflush(file) != 0)
564 throw std::logic_error(
565 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
568 if (ftruncate(fileno(file), total_len))
570 throw std::logic_error(
571 fmt::format(
"Failed to truncate ledger: {}", strerror(errno)));
574 fseeko(file, total_len, SEEK_SET);
575 LOG_TRACE_FMT(
"Truncated ledger file {} at seqno {}", file_name, idx);
586 fseeko(file, total_len, SEEK_SET);
587 size_t table_offset = ftello(file);
591 reinterpret_cast<uint8_t*
>(positions.data()),
592 sizeof(positions.at(0)),
594 file) != positions.size())
596 throw std::logic_error(
"Failed to write positions table to ledger");
600 if (fseeko(file, 0, SEEK_SET) != 0)
602 throw std::logic_error(
"Failed to set file offset to 0");
605 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
607 throw std::logic_error(
"Failed to write positions table to ledger");
610 if (fflush(file) != 0)
612 throw std::logic_error(
613 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
621 bool rename(
const std::string& new_file_name)
623 auto file_path = dir / file_name;
624 auto new_file_path = dir / new_file_name;
630 catch (
const std::exception& e)
634 LOG_FAIL_FMT(
"Error renaming ledger file: {}", e.what());
636 file_name = new_file_name;
642 auto new_file_name = remove_recovery_suffix(file_name.c_str());
645 LOG_DEBUG_FMT(
"Open recovery ledger file {}", new_file_name);
656 if (fflush(file) != 0)
658 throw std::logic_error(
659 fmt::format(
"Failed to flush ledger file: {}", strerror(errno)));
662 auto committed_file_name = fmt::format(
667 ledger_committed_suffix);
671 committed_file_name = fmt::format(
672 "{}.{}", committed_file_name, ledger_recovery_file_suffix);
675 if (!
rename(committed_file_name))
692 static constexpr size_t max_chunk_threshold_size =
693 std::numeric_limits<uint32_t>::max();
698 const fs::path ledger_dir;
701 std::vector<fs::path> read_ledger_dirs;
705 std::list<std::shared_ptr<LedgerFile>>
files;
708 size_t max_read_cache_files;
709 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
712 const size_t chunk_threshold;
714 size_t committed_idx = 0;
716 size_t end_of_committed_files_idx = 0;
720 bool use_existing_files =
false;
723 std::optional<size_t> last_idx_on_init = std::nullopt;
727 std::optional<size_t> recovery_start_idx = std::nullopt;
729 auto get_it_contains_idx(
size_t idx)
const
736 auto f = std::upper_bound(
740 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
741 return (idx <= f->get_last_idx());
747 std::shared_ptr<LedgerFile> get_file_from_cache(
size_t idx)
755 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
758 for (
auto const& f : files_read_cache)
760 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
771 std::string ledger_dir_;
772 auto match = get_file_name_with_idx(ledger_dir, idx,
true);
773 if (match.has_value())
775 ledger_dir_ = ledger_dir;
779 for (
auto const& dir : read_ledger_dirs)
781 match = get_file_name_with_idx(dir, idx,
false);
782 if (match.has_value())
790 if (!match.has_value())
797 std::shared_ptr<LedgerFile> match_file =
nullptr;
800 match_file = std::make_shared<LedgerFile>(ledger_dir_, match.value());
802 catch (
const std::exception& e)
805 "Could not open ledger file {} to read seqno {}", match.value(), idx);
810 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
812 files_read_cache.emplace_back(match_file);
813 if (files_read_cache.size() > max_read_cache_files)
815 files_read_cache.erase(files_read_cache.begin());
822 std::shared_ptr<LedgerFile> get_file_from_idx(
823 size_t idx,
bool read_cache_only =
false)
830 if (!read_cache_only)
833 auto f = std::upper_bound(
837 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
838 return idx >= f->get_start_idx();
841 if (f !=
files.rend())
848 return get_file_from_cache(idx);
851 std::shared_ptr<LedgerFile> get_latest_file(
852 bool incomplete_only =
true)
const
858 const auto& last_file =
files.back();
859 if (incomplete_only && last_file->is_complete())
867 std::optional<LedgerReadResult> read_entries_range(
870 bool read_cache_only =
false,
871 std::optional<size_t> max_entries_size = std::nullopt)
876 if ((from <= 0) || (to < from))
895 auto f_from = get_file_from_idx(idx, read_cache_only);
896 if (f_from ==
nullptr)
898 LOG_FAIL_FMT(
"Cannot find ledger file for seqno {}", idx);
901 auto to_ = std::min(f_from->get_last_idx(), to);
902 std::optional<size_t> max_size = std::nullopt;
903 if (max_entries_size.has_value())
905 max_size = max_entries_size.value() - rr.
data.size();
907 auto v = f_from->read_entries(idx, to_, max_size);
915 std::make_move_iterator(v->data.begin()),
916 std::make_move_iterator(v->data.end()));
917 if (v->end_idx != to_)
928 if (!rr.
data.empty())
938 void ignore_ledger_file(
const std::string& file_name)
940 if (is_ledger_file_name_ignored(file_name))
945 auto ignored_file_name =
946 fmt::format(
"{}.{}", file_name, ledger_ignored_file_suffix);
947 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
950 void delete_ledger_files_after_idx(
size_t idx)
953 for (
auto const& f : fs::directory_iterator(ledger_dir))
955 auto file_name = f.path().filename();
956 auto start_idx = get_start_idx_from_file_name(file_name);
959 if (!fs::remove(ledger_dir / file_name))
961 throw std::logic_error(
962 fmt::format(
"Could not remove file {}", file_name));
965 "Forcing removal of ledger file {} as start idx {} > {}",
973 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(
size_t idx)
975 if (!use_existing_files)
980 for (
auto const& f : fs::directory_iterator(ledger_dir))
982 auto file_name = f.path().filename();
984 idx == get_start_idx_from_file_name(file_name) &&
985 !is_ledger_file_ignored(file_name))
987 return std::make_shared<LedgerFile>(
988 ledger_dir, file_name,
true );
999 const fs::path& ledger_dir,
1001 size_t chunk_threshold,
1002 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1003 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1004 to_enclave(writer_factory.create_writer_to_inside()),
1005 ledger_dir(ledger_dir),
1006 max_read_cache_files(max_read_cache_files),
1007 chunk_threshold(chunk_threshold)
1009 if (chunk_threshold == 0 || chunk_threshold > max_chunk_threshold_size)
1011 throw std::logic_error(fmt::format(
1012 "Error: Ledger chunk threshold should be between 1-{}",
1013 max_chunk_threshold_size));
1017 for (
const auto& read_dir : read_ledger_dirs_)
1019 LOG_INFO_FMT(
"Recovering read-only ledger directory \"{}\"", read_dir);
1020 if (!fs::is_directory(read_dir))
1022 throw std::logic_error(
1023 fmt::format(
"{} read-only ledger is not a directory", read_dir));
1026 read_ledger_dirs.emplace_back(read_dir);
1028 for (
auto const& f : fs::directory_iterator(read_dir))
1030 auto file_name = f.path().filename();
1031 auto last_idx_ = get_last_idx_from_file_name(file_name);
1033 !last_idx_.has_value() ||
1034 !is_ledger_file_name_committed(file_name) ||
1035 is_ledger_file_name_ignored(file_name))
1038 "Read-only ledger file {} is ignored as not committed",
1043 if (last_idx_.value() > last_idx)
1045 last_idx = last_idx_.value();
1046 committed_idx = last_idx;
1047 end_of_committed_files_idx = last_idx;
1051 "Recovering file from read-only ledger directory: {}", file_name);
1058 "Recovered read-only ledger directories up to {}, committed up to "
1064 if (fs::is_directory(ledger_dir))
1067 LOG_INFO_FMT(
"Recovering main ledger directory {}", ledger_dir);
1069 for (
auto const& f : fs::directory_iterator(ledger_dir))
1071 auto file_name = f.path().filename();
1073 if (is_ledger_file_ignored(file_name))
1076 "Ignoring ledger file {} in main ledger directory", file_name);
1078 ignore_ledger_file(file_name);
1083 std::shared_ptr<LedgerFile> ledger_file =
nullptr;
1086 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1090 if (ledger_file->truncate(ledger_file->get_last_idx()))
1098 catch (
const std::exception& e)
1101 "Error reading ledger file {}: {}", file_name, e.what());
1103 ignore_ledger_file(file_name);
1108 "Recovering file from main ledger directory: {}", file_name);
1109 files.emplace_back(std::move(ledger_file));
1115 "Main ledger directory {} is empty: no ledger file to "
1122 const std::shared_ptr<LedgerFile>& a,
1123 const std::shared_ptr<LedgerFile>& b) {
1124 return a->get_last_idx() < b->get_last_idx();
1127 auto main_ledger_dir_last_idx = get_latest_file(
false)->get_last_idx();
1128 if (main_ledger_dir_last_idx > last_idx)
1130 last_idx = main_ledger_dir_last_idx;
1134 for (
auto f =
files.begin(); f !=
files.end();)
1136 if ((*f)->is_committed())
1138 const auto f_last_idx = (*f)->get_last_idx();
1139 if (f_last_idx > committed_idx)
1141 committed_idx = f_last_idx;
1142 end_of_committed_files_idx = f_last_idx;
1154 if (!fs::create_directory(ledger_dir))
1156 throw std::logic_error(fmt::format(
1157 "Error: Could not create ledger directory: {}", ledger_dir));
1162 "Recovered ledger entries up to {}, committed to {}",
1169 void init(
size_t idx,
size_t recovery_start_idx_ = 0)
1172 fmt::format(
"Initing ledger - seqno={}", idx));
1181 for (
auto const& f : fs::directory_iterator(ledger_dir))
1183 auto file_name = f.path().filename();
1185 is_ledger_file_name_committed(file_name) &&
1186 (get_start_idx_from_file_name(file_name) > idx))
1188 auto last_idx_file = get_last_idx_from_file_name(file_name);
1189 if (!last_idx_file.has_value())
1191 throw std::logic_error(fmt::format(
1192 "Committed ledger file {} does not include last idx in file name",
1197 "Remove committed suffix from ledger file {} after init at {}: "
1201 last_idx_file.value());
1204 ledger_dir / file_name,
1210 ledger_last_idx_delimiter,
1211 last_idx_file.value(),
1212 ledger_committed_suffix)));
1220 use_existing_files =
true;
1221 last_idx_on_init = last_idx;
1223 committed_idx = idx;
1224 if (recovery_start_idx_ > 0)
1228 recovery_start_idx = recovery_start_idx_;
1232 "Set last known/commit seqno to {}, recovery seqno to {}",
1234 recovery_start_idx_);
1244 for (
auto it =
files.begin(); it !=
files.end();)
1247 if (f->is_recovery())
1255 if (f->is_committed())
1257 it =
files.erase(it);
1264 recovery_start_idx.reset();
1274 recovery_start_idx = idx;
1280 fmt::format(
"Reading ledger entry at {}", idx));
1282 return read_entries_range(idx, idx);
1288 std::optional<size_t> max_entries_size = std::nullopt)
1291 fmt::format(
"Reading ledger entries from {} to {}", from, to));
1293 return read_entries_range(from, to,
false, max_entries_size);
1296 size_t write_entry(
const uint8_t* data,
size_t size,
bool committable)
1299 "Writing ledger entry - {} bytes, committable={}", size, committable));
1302 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1307 "Forcing ledger chunk before entry as required by the entry header "
1310 auto file = get_latest_file();
1311 if (file !=
nullptr)
1314 LOG_DEBUG_FMT(
"Ledger chunk completed at {}", file->get_last_idx());
1318 bool force_chunk_after =
1320 if (force_chunk_after)
1324 throw std::logic_error(
1325 "Ledger chunks cannot end in a non-committable transaction");
1328 "Forcing ledger chunk after entry as required by the entry header "
1332 auto file = get_latest_file();
1333 if (file ==
nullptr)
1336 size_t start_idx = last_idx + 1;
1337 if (use_existing_files)
1341 file = get_existing_ledger_file_for_idx(start_idx);
1343 if (file ==
nullptr)
1345 bool is_recovery = recovery_start_idx.has_value() &&
1346 start_idx > recovery_start_idx.value();
1348 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1350 files.emplace_back(file);
1352 auto [last_idx_, has_truncated] =
1353 file->write_entry(data, size, committable);
1354 last_idx = last_idx_;
1360 LOG_INFO_FMT(
"Found divergent ledger entry at {}", last_idx);
1361 delete_ledger_files_after_idx(last_idx);
1362 use_existing_files =
false;
1366 use_existing_files && last_idx_on_init.has_value() &&
1367 last_idx > last_idx_on_init.value())
1369 use_existing_files =
false;
1373 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1380 (force_chunk_after || file->get_current_size() >= chunk_threshold))
1391 TimeBoundLogger log_if_slow(fmt::format(
"Truncating ledger at {}", idx));
1398 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1401 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1408 auto f_from = get_it_contains_idx(idx + 1);
1409 auto f_to = get_it_contains_idx(last_idx);
1410 auto f_end = std::next(f_to);
1412 for (
auto it = f_from; it != f_end;)
1416 auto truncate_idx = (it == f_from) ? idx : (*it)->get_start_idx() - 1;
1417 if ((*it)->truncate(truncate_idx))
1419 it =
files.erase(it);
1433 fmt::format(
"Committing ledger entry {}", idx));
1437 if (idx <= committed_idx || idx > last_idx)
1442 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1443 get_it_contains_idx(committed_idx);
1444 auto f_to = get_it_contains_idx(idx);
1445 auto f_end = std::next(f_to);
1447 for (
auto it = f_from; it != f_end;)
1451 const auto last_idx_in_file = (*it)->get_last_idx();
1452 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1454 (*it)->commit(commit_idx) &&
1455 (it != f_to || (idx == last_idx_in_file)))
1457 end_of_committed_files_idx = last_idx_in_file;
1458 it =
files.erase(it);
1466 committed_idx = idx;
1471 return idx <= end_of_committed_files_idx;
1485 std::function<void(std::optional<LedgerReadResult>&&,
int)>;
1496 data->
read_result = data->ledger->read_entries_range(
1497 data->from_idx, data->to_idx,
true, data->max_size);
1504 data->
result_cb(std::move(data->read_result), status);
1513 std::optional<LedgerReadResult>&& read_result,
1516 if (read_result.has_value())
1519 ::consensus::ledger_entry_range,
1522 read_result->end_idx,
1529 ::consensus::ledger_no_entry_range,
1542 ::consensus::ledger_init,
1543 [
this](
const uint8_t* data,
size_t size) {
1544 auto idx = serialized::read<::consensus::Index>(data, size);
1545 auto recovery_start_index =
1546 serialized::read<::consensus::Index>(data, size);
1547 init(idx, recovery_start_index);
1552 ::consensus::ledger_append,
1553 [
this](
const uint8_t* data,
size_t size) {
1554 auto committable = serialized::read<bool>(data, size);
1560 ::consensus::ledger_truncate,
1561 [
this](
const uint8_t* data,
size_t size) {
1562 auto idx = serialized::read<::consensus::Index>(data, size);
1563 auto recovery_mode = serialized::read<bool>(data, size);
1573 ::consensus::ledger_commit,
1574 [
this](
const uint8_t* data,
size_t size) {
1575 auto idx = serialized::read<::consensus::Index>(data, size);
1580 disp, ::consensus::ledger_open, [
this](
const uint8_t*,
size_t) {
1586 ::consensus::ledger_get_range,
1587 [&](
const uint8_t* data,
size_t size) {
1588 auto [from_idx, to_idx, purpose] =
1589 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1593 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1594 auto max_entries_size = to_enclave->get_max_message_size() -
1595 write_ledger_range_response_metadata_size;
1601 uv_work_t* work_handle =
new uv_work_t;
1606 job->from_idx = from_idx;
1607 job->to_idx = to_idx;
1608 job->max_size = max_entries_size;
1610 [
this, from_idx = from_idx, to_idx = to_idx, purpose = purpose](
1611 auto&& read_result,
int status) {
1615 from_idx, to_idx, std::move(read_result), purpose);
1618 work_handle->data = job;
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:380
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:212
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:178
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:525
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:484
~LedgerFile()
Definition ledger.h:340
bool commit(size_t idx)
Definition ledger.h:648
bool is_committed() const
Definition ledger.h:363
void complete()
Definition ledger.h:579
bool is_recovery() const
Definition ledger.h:373
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:430
bool rename(const std::string &new_file_name)
Definition ledger.h:621
bool is_complete() const
Definition ledger.h:368
void open()
Definition ledger.h:640
size_t get_last_idx() const
Definition ledger.h:353
size_t get_current_size() const
Definition ledger.h:358
size_t get_start_idx() const
Definition ledger.h:348
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1277
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t chunk_threshold, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:998
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1285
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1296
void truncate(size_t idx)
Definition ledger.h:1389
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1500
void complete_recovery()
Definition ledger.h:1237
bool is_in_committed_file(size_t idx)
Definition ledger.h:1469
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1537
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1492
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1510
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1272
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1169
size_t get_last_idx() const
Definition ledger.h:1267
void commit(size_t idx)
Definition ledger.h:1430
Definition messaging.h:38
Definition ring_buffer_types.h:153
#define LOG_INFO_FMT
Definition logger.h:395
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:316
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:17
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
void rename(const fs::path &src, const fs::path &dst)
Definition files.h:141
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:150
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
std::vector< uint8_t > data
Definition ledger.h:143
size_t end_idx
Definition ledger.h:144
std::optional< LedgerReadResult > read_result
Definition ledger.h:1489
Ledger * ledger
Definition ledger.h:1477
size_t max_size
Definition ledger.h:1480
ResultCallback result_cb
Definition ledger.h:1486
size_t from_idx
Definition ledger.h:1478
size_t to_idx
Definition ledger.h:1479
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1485
Definition time_bound_logger.h:14