28namespace fs = std::filesystem;
32 static constexpr size_t ledger_max_read_cache_files_default = 5;
34 static std::optional<std::string> get_file_name_with_idx(
35 const std::string& dir,
size_t idx,
bool allow_recovery_files)
37 std::optional<std::string> match = std::nullopt;
38 for (
auto const& f : fs::directory_iterator(dir))
42 auto f_name = f.path().filename();
44 is_ledger_file_name_ignored(f_name) ||
45 (!allow_recovery_files && is_ledger_file_name_recovery(f_name)))
51 std::optional<size_t> last_idx = std::nullopt;
54 start_idx = get_start_idx_from_file_name(f_name);
55 last_idx = get_last_idx_from_file_name(f_name);
57 catch (
const std::exception& e)
62 if (idx >= start_idx && last_idx.has_value() && idx <= last_idx.value())
81 using positions_offset_header_t = size_t;
82 static constexpr auto file_name_prefix =
"ledger";
94 std::vector<uint32_t> positions;
96 bool completed =
false;
97 bool committed =
false;
99 bool recovery =
false;
105 bool from_existing_file =
false;
109 LedgerFile(
const fs::path& dir,
size_t start_idx,
bool recovery =
false) :
111 file_name(fmt::format(
"{}_{}", file_name_prefix, start_idx)),
112 start_idx(start_idx),
118 fmt::format(
"{}{}", file_name.string(), ledger_recovery_file_suffix);
121 auto file_path = dir / file_name;
127 fmt::format(
"Creating ledger file - fopen({})", file_path));
129 file = fopen(file_path.c_str(),
"w+bx");
135 throw std::logic_error(fmt::format(
136 "Cannot create new ledger file {} in main ledger directory {} as "
141 throw std::logic_error(fmt::format(
142 "Unable to open ledger file {}: {}",
144 std::strerror(errno)));
148 fseeko(file,
sizeof(positions_offset_header_t), SEEK_SET);
149 total_len =
sizeof(positions_offset_header_t);
154 const std::string& dir,
155 const std::string& file_name_,
156 bool from_existing_file_ =
false) :
158 file_name(file_name_),
159 from_existing_file(from_existing_file_)
161 auto file_path = (fs::path(dir) / fs::path(file_name));
163 committed = is_ledger_file_name_committed(file_name);
164 start_idx = get_start_idx_from_file_name(file_name);
166 const auto*
const mode = committed ?
"rb" :
"r+b";
170 fmt::format(
"Opening ledger file - fopen({})", file_path));
172 file = fopen(file_path.c_str(), mode);
177 throw std::logic_error(fmt::format(
178 "Unable to open ledger file {}: {}",
180 std::strerror(errno)));
184 fseeko(file, 0, SEEK_END);
185 size_t total_file_size = ftello(file);
188 fseeko(file, 0, SEEK_SET);
189 positions_offset_header_t table_offset = 0;
192 fmt::format(
"Reading positions offset - fread({})", file_path));
194 fread(&table_offset,
sizeof(positions_offset_header_t), 1, file) != 1)
196 throw std::logic_error(fmt::format(
197 "Failed to read positions offset from ledger file {}", file_path));
201 if (committed && table_offset == 0)
203 throw std::logic_error(fmt::format(
204 "Committed ledger file {} cannot be read: invalid table offset (0)",
208 total_len =
sizeof(positions_offset_header_t);
210 if (from_existing_file)
218 if (table_offset != 0)
221 total_len = table_offset;
222 fseeko(file, table_offset, SEEK_SET);
224 if (table_offset > total_file_size)
226 throw std::logic_error(fmt::format(
227 "Invalid table offset {} greater than total file size {}",
233 (total_file_size - table_offset) /
sizeof(positions.at(0)));
237 "Reading positions table ({} entries) - fread({})",
243 sizeof(positions.at(0)),
245 file) != positions.size())
247 throw std::logic_error(fmt::format(
248 "Failed to read positions table from ledger file {}", file_path));
257 total_len =
sizeof(positions_offset_header_t);
258 auto len = total_file_size - total_len;
261 "Recovering entries from incomplete ledger file {} ({} bytes)",
266 size_t current_idx = start_idx;
267 while (len >= ccf::kv::serialised_entry_header_size)
271 &entry_header, ccf::kv::serialised_entry_header_size, 1, file) !=
275 "Failed to read entry header from ledger file {} at seqno {}",
281 len -= ccf::kv::serialised_entry_header_size;
283 const auto& entry_size = entry_header.
size;
284 if (len < entry_size)
287 "Malformed incomplete ledger file {} at seqno {} (expecting "
298 fseeko(file, entry_size, SEEK_CUR);
302 "Recovered one entry of size {} at seqno {}",
307 positions.push_back(total_len);
308 total_len += (ccf::kv::serialised_entry_header_size + entry_size);
319 fmt::format(
"Closing ledger file - fclose({})", file_name));
332 return start_idx + positions.size() - 1;
358 const uint8_t* data,
size_t size,
bool committable)
360 fseeko(file, total_len, SEEK_SET);
362 bool should_write =
true;
363 bool has_truncated =
false;
364 if (from_existing_file)
366 std::vector<uint8_t> entry(size);
367 bool read_mismatch =
false;
370 "Reading existing entry for comparison ({} bytes) - fread({})",
373 read_mismatch = fread(entry.data(), size, 1, file) != 1 ||
374 memcmp(entry.data(), data, size) != 0;
383 has_truncated =
true;
384 from_existing_file =
false;
388 should_write =
false;
396 "Writing ledger entry ({} bytes) - fwrite({})", size, file_name));
397 if (fwrite(data, size, 1, file) != 1)
399 throw std::logic_error(
"Failed to write entry to ledger");
407 fmt::format(
"Flushing ledger entry - fflush({})", file_name));
408 if (fflush(file) != 0)
410 throw std::logic_error(fmt::format(
411 "Failed to flush entry to ledger: {}",
412 std::strerror(errno)));
417 positions.push_back(total_len);
427 std::optional<size_t> max_size = std::nullopt)
const
429 if ((from < start_idx) || (to < from) || (to >
get_last_idx()))
440 (to ==
get_last_idx()) ? total_len : positions.at(to - start_idx + 1);
441 size = position_to - positions.at(from - start_idx);
443 if (!max_size.has_value() || size <= max_size.value())
452 "Single ledger entry at {} in file {} is too large for remaining "
453 "space (size {} > max {})",
460 size_t to_ = from + (to - from) / 2;
462 "Requesting ledger entries from {} to {} in file {} but size {} > "
463 "max size {}: now requesting up to {}",
477 size_t from,
size_t to, std::optional<size_t> max_size = std::nullopt)
479 if ((from < start_idx) || (to >
get_last_idx()) || (to < from))
482 "Cannot find entries: {} - {} in ledger file {}",
490 "Read entries from {} to {} in {} [max size: {}]",
494 max_size.value_or(0));
496 std::unique_lock<ccf::pal::Mutex> guard(file_lock);
502 std::vector<uint8_t> entries(size);
503 fseeko(file, positions.at(from - start_idx), SEEK_SET);
507 "Reading ledger entries {} to {} ({} bytes) - fread({})",
512 if (fread(entries.data(), size, 1, file) != 1)
514 throw std::logic_error(fmt::format(
515 "Failed to read entry range {} - {} from file {}",
525 bool truncate(
size_t idx,
bool remove_file_if_empty =
true)
528 committed || (idx < start_idx - 1) ||
534 if (remove_file_if_empty && idx == start_idx - 1)
539 "Removing ledger file on truncation - remove({})", file_name));
540 if (!fs::remove(dir / file_name))
542 throw std::logic_error(
543 fmt::format(
"Could not remove file {}", file_name));
547 "Removed ledger file {} on truncation at {}", file_name, idx);
552 fseeko(file, 0, SEEK_SET);
553 positions_offset_header_t table_offset = 0;
556 fmt::format(
"Resetting positions offset - fwrite({})", file_name));
557 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
559 throw std::logic_error(
"Failed to reset positions table offset");
566 total_len = positions.at(idx - start_idx + 1);
567 positions.resize(idx - start_idx + 1);
572 fmt::format(
"Flushing truncated ledger - fflush({})", file_name));
573 if (fflush(file) != 0)
575 throw std::logic_error(fmt::format(
576 "Failed to flush ledger file: {}",
577 std::strerror(errno)));
583 fmt::format(
"Truncating ledger file - ftruncate({})", file_name));
584 if (ftruncate(fileno(file), total_len) != 0)
586 throw std::logic_error(fmt::format(
587 "Failed to truncate ledger: {}",
588 std::strerror(errno)));
592 fseeko(file, total_len, SEEK_SET);
593 LOG_TRACE_FMT(
"Truncated ledger file {} at seqno {}", file_name, idx);
611 if (from_existing_file)
616 fseeko(file, total_len, SEEK_SET);
617 size_t table_offset = ftello(file);
621 "Writing positions table ({} entries) - fwrite({})",
626 reinterpret_cast<uint8_t*
>(positions.data()),
627 sizeof(positions.at(0)),
629 file) != positions.size())
631 throw std::logic_error(
"Failed to write positions table to ledger");
636 if (fseeko(file, 0, SEEK_SET) != 0)
638 throw std::logic_error(
"Failed to set file offset to 0");
643 "Writing positions table offset - fwrite({})", file_name));
644 if (fwrite(&table_offset,
sizeof(table_offset), 1, file) != 1)
646 throw std::logic_error(
647 "Failed to write positions table offset to ledger");
653 fmt::format(
"Completing ledger file - fflush({})", file_name));
654 if (fflush(file) != 0)
656 throw std::logic_error(fmt::format(
657 "Failed to flush ledger file: {}",
658 std::strerror(errno)));
667 bool rename(
const std::string& new_file_name)
669 auto file_path = dir / file_name;
670 auto new_file_path = dir / new_file_name;
675 "Renaming ledger file {} to {} - rename()",
678 files::rename(file_path, new_file_path);
680 catch (
const std::exception& e)
684 LOG_FAIL_FMT(
"Error renaming ledger file: {}", e.what());
686 file_name = new_file_name;
692 auto new_file_name = remove_recovery_suffix(file_name.c_str());
695 LOG_DEBUG_FMT(
"Open recovery ledger file {}", new_file_name);
712 fmt::format(
"Committing ledger file - fsync({})", file_name));
713 if (fsync(fileno(file)) != 0)
715 throw std::logic_error(fmt::format(
716 "Failed to flush ledger file: {}",
717 std::strerror(errno)));
721 auto committed_file_name = fmt::format(
726 ledger_committed_suffix);
730 committed_file_name =
731 fmt::format(
"{}{}", committed_file_name, ledger_recovery_file_suffix);
734 if (!
rename(committed_file_name))
754 const fs::path ledger_dir;
757 const std::vector<fs::path> read_ledger_dirs;
763 std::list<std::shared_ptr<LedgerFile>>
files;
766 const size_t max_read_cache_files;
767 std::list<std::shared_ptr<LedgerFile>> files_read_cache;
771 size_t committed_idx = 0;
773 size_t end_of_committed_files_idx = 0;
777 bool use_existing_files =
false;
780 std::optional<size_t> last_idx_on_init = std::nullopt;
787 std::optional<size_t> recovery_start_idx = std::nullopt;
789 [[nodiscard]]
auto get_it_contains_idx(
size_t idx)
const
796 auto f = std::upper_bound(
800 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
801 return (idx <= f->get_last_idx());
807 std::shared_ptr<LedgerFile> get_file_from_cache(
size_t idx)
815 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
818 for (
auto const& f : files_read_cache)
820 if (f->get_start_idx() <= idx && idx <= f->get_last_idx())
831 std::string ledger_dir_;
832 auto match = get_file_name_with_idx(ledger_dir, idx,
true);
833 if (match.has_value())
835 ledger_dir_ = ledger_dir;
839 for (
auto const& dir : read_ledger_dirs)
841 match = get_file_name_with_idx(dir, idx,
false);
842 if (match.has_value())
850 if (!match.has_value())
857 std::shared_ptr<LedgerFile> match_file =
nullptr;
860 match_file = std::make_shared<LedgerFile>(
861 ledger_dir_, *match);
863 catch (
const std::exception& e)
866 "Could not open ledger file {} to read seqno {}: {}",
874 std::unique_lock<ccf::pal::Mutex> guard(read_cache_lock);
876 files_read_cache.emplace_back(match_file);
877 if (files_read_cache.size() > max_read_cache_files)
879 files_read_cache.erase(files_read_cache.begin());
886 std::shared_ptr<LedgerFile> get_file_from_idx(
887 size_t idx,
bool read_cache_only =
false)
894 if (!read_cache_only)
897 auto f = std::upper_bound(
901 [](
size_t idx,
const std::shared_ptr<LedgerFile>& f) {
902 return idx >= f->get_start_idx();
905 if (f !=
files.rend())
912 return get_file_from_cache(idx);
915 [[nodiscard]] std::shared_ptr<LedgerFile> get_latest_file(
916 bool incomplete_only =
true)
const
922 const auto& last_file =
files.back();
923 if (incomplete_only && last_file->is_complete())
931 std::optional<LedgerReadResult> read_entries_range(
934 bool read_cache_only =
false,
935 std::optional<size_t> max_entries_size = std::nullopt)
937 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
942 if ((from <= 0) || (to < from))
961 auto f_from = get_file_from_idx(idx, read_cache_only);
962 if (f_from ==
nullptr)
964 LOG_FAIL_FMT(
"Cannot find ledger file for seqno {}", idx);
967 auto to_ = std::min(f_from->get_last_idx(), to);
968 std::optional<size_t> max_size = std::nullopt;
969 if (max_entries_size.has_value())
971 max_size = max_entries_size.value() - rr.
data.size();
973 auto v = f_from->read_entries(idx, to_, max_size);
981 std::make_move_iterator(v->data.begin()),
982 std::make_move_iterator(v->data.end()));
983 if (v->end_idx != to_)
994 if (!rr.
data.empty())
1002 void ignore_ledger_file(
const std::string& file_name)
1004 if (is_ledger_file_name_ignored(file_name))
1009 auto ignored_file_name =
1010 fmt::format(
"{}{}", file_name, ledger_ignored_file_suffix);
1013 "Ignoring ledger file - rename({} to {})",
1015 ignored_file_name));
1016 files::rename(ledger_dir / file_name, ledger_dir / ignored_file_name);
1020 void delete_ledger_files_after_idx(
size_t idx)
1023 for (
auto const& f : fs::directory_iterator(ledger_dir))
1025 auto file_name = f.path().filename();
1026 auto start_idx = get_start_idx_from_file_name(file_name);
1027 if (start_idx > idx)
1030 "Deleting divergent ledger file - remove({})", file_name));
1031 if (!fs::remove(ledger_dir / file_name))
1033 throw std::logic_error(
1034 fmt::format(
"Could not remove file {}", file_name));
1037 "Forcing removal of ledger file {} as start idx {} > {}",
1045 std::shared_ptr<LedgerFile> get_existing_ledger_file_for_idx(
size_t idx)
1047 if (!use_existing_files)
1052 for (
auto const& f : fs::directory_iterator(ledger_dir))
1054 auto file_name = f.path().filename();
1056 idx == get_start_idx_from_file_name(file_name) &&
1057 !is_ledger_file_ignored(file_name))
1059 return std::make_shared<LedgerFile>(
1060 ledger_dir, file_name,
true );
1071 const fs::path& ledger_dir,
1073 size_t max_read_cache_files = ledger_max_read_cache_files_default,
1074 const std::vector<std::string>& read_ledger_dirs_ = {}) :
1075 to_enclave(writer_factory.create_writer_to_inside()),
1076 ledger_dir(ledger_dir),
1077 read_ledger_dirs(read_ledger_dirs_.begin(), read_ledger_dirs_.end()),
1078 max_read_cache_files(max_read_cache_files)
1081 for (
const auto& read_dir : read_ledger_dirs_)
1083 LOG_INFO_FMT(
"Recovering read-only ledger directory \"{}\"", read_dir);
1084 if (!fs::is_directory(read_dir))
1086 throw std::logic_error(
1087 fmt::format(
"{} read-only ledger is not a directory", read_dir));
1090 for (
auto const& f : fs::directory_iterator(read_dir))
1092 auto file_name = f.path().filename();
1093 auto last_idx_ = get_last_idx_from_file_name(file_name);
1095 !last_idx_.has_value() ||
1096 !is_ledger_file_name_committed(file_name) ||
1097 is_ledger_file_name_ignored(file_name))
1100 "Read-only ledger file {} is ignored as not committed",
1105 if (last_idx_.value() > last_idx)
1107 last_idx = last_idx_.value();
1108 committed_idx = last_idx;
1109 end_of_committed_files_idx = last_idx;
1113 "Recovering file from read-only ledger directory: {}", file_name);
1120 "Recovered read-only ledger directories up to {}, committed up to "
1126 if (fs::is_directory(ledger_dir))
1132 LOG_INFO_FMT(
"Recovering main ledger directory {}", ledger_dir);
1134 for (
auto const& f : fs::directory_iterator(ledger_dir))
1136 auto file_name = f.path().filename();
1138 if (is_ledger_file_ignored(file_name))
1141 "Ignoring ledger file {} in main ledger directory", file_name);
1143 ignore_ledger_file(file_name);
1148 const auto file_end_idx = get_last_idx_from_file_name(file_name);
1150 if (is_ledger_file_name_committed(file_name))
1152 if (!file_end_idx.has_value())
1155 "Unexpected file {} in {}: committed but not completed",
1161 if (file_end_idx.value() > committed_idx)
1163 committed_idx = file_end_idx.value();
1164 end_of_committed_files_idx = file_end_idx.value();
1171 if (file_end_idx.has_value() && file_end_idx.value() <= committed_idx)
1174 "Ignoring ledger file {} in main ledger directory - already "
1175 "discovered commit up to {} from read-only directories",
1179 ignore_ledger_file(file_name);
1184 std::shared_ptr<LedgerFile> ledger_file =
nullptr;
1187 ledger_file = std::make_shared<LedgerFile>(ledger_dir, file_name);
1191 if (ledger_file->truncate(ledger_file->get_last_idx()))
1199 catch (
const std::exception& e)
1202 "Error reading ledger file {}: {}", file_name, e.what());
1204 ignore_ledger_file(file_name);
1209 "Recovering file from main ledger directory: {}", file_name);
1210 files.emplace_back(std::move(ledger_file));
1216 "Main ledger directory {} is empty: no ledger file to "
1224 last_idx = committed_idx;
1229 "Main ledger directory {} contains {} restored (writeable) files",
1234 const std::shared_ptr<LedgerFile>& a,
1235 const std::shared_ptr<LedgerFile>& b) {
1236 return a->get_last_idx() < b->get_last_idx();
1239 const auto main_ledger_dir_last_idx =
1240 get_latest_file(
false)->get_last_idx();
1241 if (main_ledger_dir_last_idx > last_idx)
1243 last_idx = main_ledger_dir_last_idx;
1248 TimeBoundLogger log_if_slow(fmt::format(
1249 "Creating ledger directory - create_directory({})", ledger_dir));
1250 if (!fs::create_directory(ledger_dir))
1252 throw std::logic_error(fmt::format(
1253 "Error: Could not create ledger directory: {}", ledger_dir));
1258 "Recovered ledger entries up to {}, committed to {}",
1265 void init(
size_t idx,
size_t recovery_start_idx_ = 0)
1268 fmt::format(
"Initing ledger - seqno={}", idx));
1270 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1281 for (
auto const& f : fs::directory_iterator(ledger_dir))
1283 auto file_name = f.path().filename();
1285 is_ledger_file_name_committed(file_name) &&
1286 (get_start_idx_from_file_name(file_name) > idx))
1288 auto last_idx_file = get_last_idx_from_file_name(file_name);
1289 if (!last_idx_file.has_value())
1291 throw std::logic_error(fmt::format(
1292 "Committed ledger file {} does not include last idx in file name",
1297 "Remove committed suffix from ledger file {} after init at {}: "
1301 last_idx_file.value());
1305 fmt::format(
"Removing committed suffix - rename({})", file_name));
1307 ledger_dir / file_name,
1313 ledger_last_idx_delimiter,
1314 last_idx_file.value(),
1315 ledger_committed_suffix)));
1324 use_existing_files =
true;
1325 last_idx_on_init = last_idx;
1327 committed_idx = idx;
1328 if (recovery_start_idx_ > 0)
1332 recovery_start_idx = recovery_start_idx_;
1336 "Set last known/commit seqno to {}, recovery seqno to {}",
1338 recovery_start_idx_);
1348 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1350 for (
auto it =
files.begin(); it !=
files.end();)
1353 if (f->is_recovery())
1361 if (f->is_committed())
1363 it =
files.erase(it);
1370 recovery_start_idx.reset();
1375 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1382 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1384 recovery_start_idx = idx;
1390 fmt::format(
"Reading ledger entry at {}", idx));
1394 return read_entries_range(idx, idx);
1400 std::optional<size_t> max_entries_size = std::nullopt)
1403 fmt::format(
"Reading ledger entries from {} to {}", from, to));
1407 return read_entries_range(from, to,
false, max_entries_size);
1410 size_t write_entry(
const uint8_t* data,
size_t size,
bool committable)
1413 "Writing ledger entry - {} bytes, committable={}", size, committable));
1415 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1418 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1423 "Forcing ledger chunk before entry as required by the entry header "
1426 auto file = get_latest_file();
1427 if (file !=
nullptr)
1430 LOG_DEBUG_FMT(
"Ledger chunk completed at {}", file->get_last_idx());
1434 bool force_chunk_after =
1436 if (force_chunk_after)
1440 throw std::logic_error(
1441 "Ledger chunks cannot end in a non-committable transaction");
1444 "Forcing ledger chunk after entry as required by the entry header "
1448 auto file = get_latest_file();
1449 if (file ==
nullptr)
1452 size_t start_idx = last_idx + 1;
1453 if (use_existing_files)
1457 file = get_existing_ledger_file_for_idx(start_idx);
1459 if (file ==
nullptr)
1461 bool is_recovery = recovery_start_idx.has_value() &&
1462 start_idx > recovery_start_idx.value();
1464 std::make_shared<LedgerFile>(ledger_dir, start_idx, is_recovery);
1466 files.emplace_back(file);
1468 auto [last_idx_, has_truncated] =
1469 file->write_entry(data, size, committable);
1470 last_idx = last_idx_;
1476 LOG_INFO_FMT(
"Found divergent ledger entry at {}", last_idx);
1477 delete_ledger_files_after_idx(last_idx);
1478 use_existing_files =
false;
1482 use_existing_files && last_idx_on_init.has_value() &&
1483 last_idx > last_idx_on_init.value())
1485 use_existing_files =
false;
1489 "Wrote entry at {} [committable: {}, force chunk after: {}]",
1494 if (committable && force_chunk_after)
1505 TimeBoundLogger log_if_slow(fmt::format(
"Truncating ledger at {}", idx));
1507 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1514 if (last_idx != 0 && (idx >= last_idx || idx < committed_idx))
1517 "Ignoring truncate to {} - last_idx: {}, committed_idx: {}",
1524 auto f_from = get_it_contains_idx(idx + 1);
1525 auto f_to = get_it_contains_idx(last_idx);
1528 auto f_end = (f_to ==
files.end()) ?
files.end() : std::next(f_to);
1533 bool is_first =
true;
1534 for (
auto it = f_from; it != f_end;)
1538 auto truncate_idx = is_first ? idx : (*it)->get_start_idx() - 1;
1540 if ((*it)->truncate(truncate_idx))
1542 it =
files.erase(it);
1556 fmt::format(
"Committing ledger entry {}", idx));
1558 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1562 if (idx <= committed_idx || idx > last_idx)
1567 auto f_from = (committed_idx == 0) ? get_it_contains_idx(1) :
1568 get_it_contains_idx(committed_idx);
1569 auto f_to = get_it_contains_idx(idx);
1572 auto f_end = (f_to ==
files.end()) ?
files.end() : std::next(f_to);
1574 for (
auto it = f_from; it != f_end;)
1578 const auto last_idx_in_file = (*it)->get_last_idx();
1579 auto commit_idx = (it == f_to) ? idx : last_idx_in_file;
1581 (*it)->commit(commit_idx) &&
1582 (it != f_to || (idx == last_idx_in_file)))
1584 end_of_committed_files_idx = last_idx_in_file;
1585 it =
files.erase(it);
1593 committed_idx = idx;
1598 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1600 return idx <= end_of_committed_files_idx;
1611 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1613 if (idx > end_of_committed_files_idx)
1616 "Index {} is beyond end of committed files at {}, cannot get "
1617 "committed ledger path",
1619 end_of_committed_files_idx);
1620 return std::nullopt;
1623 auto name = get_file_name_with_idx(
1624 ledger_dir, idx,
false );
1626 if (!name.has_value())
1629 "Could not find committed ledger file for index {} in {}",
1632 return std::nullopt;
1635 return ledger_dir / name.value();
1640 std::unique_lock<ccf::pal::Mutex> guard(state_lock);
1656 std::function<void(std::optional<LedgerReadResult>&&,
int)>;
1667 data->
read_result = data->ledger->read_entries_range(
1668 data->from_idx, data->to_idx,
true, data->max_size);
1675 data->
result_cb(std::move(data->read_result), status);
1684 std::optional<LedgerReadResult>&& read_result,
1687 if (read_result.has_value())
1690 ::consensus::ledger_entry_range,
1693 read_result->end_idx,
1700 ::consensus::ledger_no_entry_range,
1713 ::consensus::ledger_init,
1714 [
this](
const uint8_t* data,
size_t size) {
1715 auto idx = serialized::read<::consensus::Index>(data, size);
1716 auto recovery_start_index =
1717 serialized::read<::consensus::Index>(data, size);
1718 init(idx, recovery_start_index);
1723 ::consensus::ledger_append,
1724 [
this](
const uint8_t* data,
size_t size) {
1725 auto committable = serialized::read<bool>(data, size);
1731 ::consensus::ledger_truncate,
1732 [
this](
const uint8_t* data,
size_t size) {
1733 auto idx = serialized::read<::consensus::Index>(data, size);
1734 auto recovery_mode = serialized::read<bool>(data, size);
1744 ::consensus::ledger_commit,
1745 [
this](
const uint8_t* data,
size_t size) {
1746 auto idx = serialized::read<::consensus::Index>(data, size);
1751 disp, ::consensus::ledger_open, [
this](
const uint8_t*,
size_t) {
1757 ::consensus::ledger_get_range,
1758 [&](
const uint8_t* data,
size_t size) {
1759 auto [from_idx, to_idx, purpose] =
1760 ringbuffer::read_message<::consensus::ledger_get_range>(data, size);
1764 constexpr size_t write_ledger_range_response_metadata_size = 2048;
1765 auto max_entries_size = to_enclave->get_max_message_size() -
1766 write_ledger_range_response_metadata_size;
1773 auto* work_handle =
new uv_work_t;
1779 job->from_idx = from_idx;
1780 job->to_idx = to_idx;
1781 job->max_size = max_entries_size;
1782 job->result_cb = [
this,
1783 from_idx_ = from_idx,
1786 purpose](
auto&& read_result,
int ) {
1792 std::forward<
decltype(read_result)>(read_result),
1796 work_handle->data = job;
std::pair< size_t, bool > write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:357
LedgerFile(const std::string &dir, const std::string &file_name_, bool from_existing_file_=false)
Definition ledger.h:153
LedgerFile(const fs::path &dir, size_t start_idx, bool recovery=false)
Definition ledger.h:109
bool truncate(size_t idx, bool remove_file_if_empty=true)
Definition ledger.h:525
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt)
Definition ledger.h:476
~LedgerFile()
Definition ledger.h:314
bool commit(size_t idx)
Definition ledger.h:698
bool is_committed() const
Definition ledger.h:340
void complete()
Definition ledger.h:597
bool is_recovery() const
Definition ledger.h:350
std::pair< size_t, size_t > entries_size(size_t from, size_t to, std::optional< size_t > max_size=std::nullopt) const
Definition ledger.h:424
bool rename(const std::string &new_file_name)
Definition ledger.h:667
bool is_complete() const
Definition ledger.h:345
void open()
Definition ledger.h:690
size_t get_last_idx() const
Definition ledger.h:330
size_t get_current_size() const
Definition ledger.h:335
size_t get_start_idx() const
Definition ledger.h:325
std::optional< LedgerReadResult > read_entry(size_t idx)
Definition ledger.h:1387
size_t get_init_idx()
Definition ledger.h:1638
Ledger(const fs::path &ledger_dir, ringbuffer::AbstractWriterFactory &writer_factory, size_t max_read_cache_files=ledger_max_read_cache_files_default, const std::vector< std::string > &read_ledger_dirs_={})
Definition ledger.h:1070
std::optional< LedgerReadResult > read_entries(size_t from, size_t to, std::optional< size_t > max_entries_size=std::nullopt)
Definition ledger.h:1397
size_t write_entry(const uint8_t *data, size_t size, bool committable)
Definition ledger.h:1410
void truncate(size_t idx)
Definition ledger.h:1503
static void on_ledger_get_async_complete(uv_work_t *req, int status)
Definition ledger.h:1671
void complete_recovery()
Definition ledger.h:1341
bool is_in_committed_file(size_t idx)
Definition ledger.h:1596
void register_message_handlers(messaging::Dispatcher< ringbuffer::Message > &disp)
Definition ledger.h:1708
static void on_ledger_get_async(uv_work_t *req)
Definition ledger.h:1663
Ledger(const Ledger &that)=delete
void write_ledger_get_range_response(size_t from_idx, size_t to_idx, std::optional< LedgerReadResult > &&read_result, ::consensus::LedgerRequestPurpose purpose)
Definition ledger.h:1681
void set_recovery_start_idx(size_t idx)
Definition ledger.h:1380
void init(size_t idx, size_t recovery_start_idx_=0)
Definition ledger.h:1265
void commit(size_t idx)
Definition ledger.h:1553
size_t get_last_idx()
Definition ledger.h:1373
std::optional< fs::path > committed_ledger_path_with_idx(size_t idx)
Definition ledger.h:1608
Definition messaging.h:38
Definition ring_buffer_types.h:157
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
#define DISPATCHER_SET_MESSAGE_HANDLER(DISP, MSG,...)
Definition messaging.h:292
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::mutex Mutex
Definition locking.h:12
LedgerRequestPurpose
Definition ledger_enclave_types.h:14
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
std::vector< uint8_t > data
Definition ledger.h:74
size_t end_idx
Definition ledger.h:75
std::optional< LedgerReadResult > read_result
Definition ledger.h:1660
Ledger * ledger
Definition ledger.h:1648
size_t max_size
Definition ledger.h:1651
ResultCallback result_cb
Definition ledger.h:1657
size_t from_idx
Definition ledger.h:1649
size_t to_idx
Definition ledger.h:1650
std::function< void(std::optional< LedgerReadResult > &&, int)> ResultCallback
Definition ledger.h:1656
Definition time_bound_logger.h:14