20#include <system_error>
27 namespace files_cleanup
37 static constexpr size_t HASH_READ_CHUNK_SIZE =
size_t{64} * 1024;
41 inline std::vector<std::pair<size_t, std::filesystem::path>>
44 namespace fs = std::filesystem;
45 std::vector<std::pair<size_t, fs::path>> result;
47 for (
const auto& entry : fs::directory_iterator(dir))
49 if (!entry.is_regular_file())
54 auto file_name = entry.path().filename().string();
56 if (!is_ledger_file_name_committed(file_name))
63 auto start_idx = get_start_idx_from_file_name(file_name);
64 result.emplace_back(start_idx, entry.path());
66 catch (
const std::exception& e)
69 "Skipping ledger file {} during cleanup: {}", file_name, e.what());
74 std::sort(result.begin(), result.end(), [](
const auto& a,
const auto& b) {
75 return a.first < b.first;
83 inline std::optional<ccf::crypto::Sha256Hash>
hash_file(
84 const std::filesystem::path& path)
89 fmt::format(
"Hashing file - ifstream open({})", path));
90 f.open(path, std::ios::binary);
98 std::vector<uint8_t> buf(HASH_READ_CHUNK_SIZE);
101 fmt::format(
"Hashing file - read loop({})", path));
102 while (f.read(
reinterpret_cast<char*
>(buf.data()), buf.size()) ||
105 hasher->update_hash({buf.data(),
static_cast<size_t>(f.gcount())});
118 return hasher->finalise();
122 const std::filesystem::path& local_path,
123 const std::vector<std::filesystem::path>& read_only_dirs)
125 namespace fs = std::filesystem;
128 if (!local_hash.has_value())
134 const auto exists = fs::exists(local_path, ec);
138 "Failed to query existence of ledger chunk {}: {}. "
139 "Skipping deletion.",
140 local_path.filename(),
147 "Ledger chunk {} no longer exists, skipping",
148 local_path.filename());
153 const auto is_reg = fs::is_regular_file(local_path, ec);
157 "Failed to query type of ledger chunk {}: {}. "
158 "Skipping deletion.",
159 local_path.filename(),
166 "Ledger chunk {} is no longer a regular file, skipping",
167 local_path.filename());
172 "Ledger chunk {} exists but could not be read, skipping deletion",
173 local_path.filename());
177 auto file_name = local_path.filename();
179 for (
const auto& ro_dir : read_only_dirs)
181 auto candidate = ro_dir / file_name;
184 !fs::exists(candidate, ec) || ec ||
185 !fs::is_regular_file(candidate, ec) || ec)
193 if (!ro_hash.has_value())
196 "Ledger chunk {} in read-only directory {} could not be read",
201 if (local_hash.value() == ro_hash.value())
207 "Ledger chunk {} found in read-only directory {} but digest "
208 "does not match (local: {}, read-only: {}). Skipping deletion.",
211 local_hash.value().hex_str(),
212 ro_hash.value().hex_str());
214 catch (
const std::exception& e)
217 "Failed to read ledger chunk {} from read-only directory {}: "
218 "{}. Skipping deletion.",
231 inline std::optional<std::vector<std::pair<size_t, std::filesystem::path>>>
234 std::vector<std::filesystem::path> directories{dir};
239 catch (
const std::filesystem::filesystem_error& e)
242 "Failed to list committed snapshots in {}: {}", dir, e.what());
244 catch (
const std::exception& e)
247 "Unexpected error while listing committed snapshots in {}: {}",
257 const std::vector<std::pair<size_t, std::filesystem::path>>&
260 if (!committed_snapshots.empty())
263 return committed_snapshots.front().first;
269 const std::vector<std::pair<size_t, std::filesystem::path>>&
274 "Cleaning snapshots", std::chrono::seconds(1));
276 if (committed_snapshots.size() > max_retained)
280 for (
auto it = committed_snapshots.rbegin();
281 it != committed_snapshots.rend() - max_retained;
284 const auto& path = it->second;
286 "Deleting old snapshot {} (retaining {})",
292 "Deleting old snapshot - remove({})", path.filename()));
293 std::filesystem::remove(path, ec);
298 "Failed to delete old snapshot {}: {}",
307 const std::filesystem::path& main_dir,
308 const std::vector<std::filesystem::path>& read_only_dirs,
310 std::optional<size_t> snapshot_watermark = std::nullopt)
314 "Cleaning ledger chunks from {}, watermark={}",
316 snapshot_watermark.has_value() ?
317 std::to_string(snapshot_watermark.value()) :
319 std::chrono::seconds(1));
321 std::vector<std::pair<size_t, std::filesystem::path>> committed;
326 catch (
const std::filesystem::filesystem_error& e)
329 "Failed to list committed ledger chunks in {}: {}",
334 catch (
const std::exception& e)
337 "Unexpected error while listing committed ledger chunks in {}: {}",
343 if (committed.size() <= max_retained)
348 if (snapshot_watermark.has_value())
351 "Ledger chunk cleanup: snapshot watermark is {}",
352 snapshot_watermark.value());
357 size_t to_delete = committed.size() - max_retained;
358 for (
size_t i = 0; i < to_delete; ++i)
360 const auto& path = committed[i].second;
365 if (snapshot_watermark.has_value())
367 auto end_idx = get_last_idx_from_file_name(path.filename().string());
369 end_idx.has_value() &&
370 end_idx.value() >= snapshot_watermark.value())
373 "Keeping ledger chunk {} (end seqno {} >= snapshot "
377 snapshot_watermark.value());
392 "Keeping ledger chunk {} because no matching copy was found "
393 "in any read-only ledger directory",
399 "Deleting old committed ledger chunk {} (retaining {})",
405 "Deleting old ledger chunk - remove({})", path.filename()));
406 std::filesystem::remove(path, ec);
410 if (ec == std::errc::no_such_file_or_directory)
413 "Ledger chunk {} was already removed", path.filename());
418 "Failed to delete committed ledger chunk {}: {}",
431 std::filesystem::path snapshots_dir;
432 std::optional<size_t> max_snapshots;
435 std::filesystem::path ledger_dir;
436 std::vector<std::filesystem::path> read_only_ledger_dirs;
437 std::optional<size_t> max_committed_ledger_chunks;
442 std::shared_ptr<std::atomic<bool>> cleanup_in_progress =
443 std::make_shared<std::atomic<bool>>(
false);
447 std::filesystem::path snapshots_dir;
448 std::optional<size_t> max_snapshots;
450 std::filesystem::path ledger_dir;
451 std::vector<std::filesystem::path> read_only_ledger_dirs;
452 std::optional<size_t> max_committed_ledger_chunks;
454 std::shared_ptr<std::atomic<bool>> cleanup_in_progress;
457 static void on_cleanup_work(uv_work_t* req)
459 auto* work =
static_cast<CleanupWork*
>(req->data);
464 auto committed_snapshots_opt =
467 if (!committed_snapshots_opt.has_value())
472 "Skipping all file cleanup because committed snapshot listing "
477 auto& committed_snapshots = committed_snapshots_opt.value();
479 if (work->max_snapshots.has_value())
482 committed_snapshots, work->max_snapshots.value());
484 if (work->max_committed_ledger_chunks.has_value())
486 auto snapshot_watermark =
490 work->read_only_ledger_dirs,
491 work->max_committed_ledger_chunks.value(),
496 static void on_cleanup_work_done(uv_work_t* req,
int )
498 auto* work =
static_cast<CleanupWork*
>(req->data);
499 work->cleanup_in_progress->store(
false);
507 const std::string& snapshots_dir_,
508 std::optional<size_t> max_snapshots_,
509 const std::string& ledger_dir_,
510 const std::vector<std::string>& read_only_ledger_dirs_,
511 std::optional<size_t> max_committed_ledger_chunks_) :
512 snapshots_dir(snapshots_dir_),
513 max_snapshots(max_snapshots_),
514 ledger_dir(ledger_dir_),
515 max_committed_ledger_chunks(max_committed_ledger_chunks_)
517 for (
const auto& d : read_only_ledger_dirs_)
519 read_only_ledger_dirs.emplace_back(d);
522 if (max_snapshots.has_value() && max_snapshots.value() < 1)
524 throw std::logic_error(fmt::format(
525 "files_cleanup.max_snapshots must be at least 1, got {}",
526 max_snapshots.value()));
529 max_committed_ledger_chunks.has_value() &&
530 read_only_ledger_dirs.empty())
532 throw std::logic_error(
533 "files_cleanup.max_committed_ledger_chunks requires at least one "
534 "ledger.read_only_directories entry. Committed ledger chunks are "
535 "only deleted after verifying an identical copy exists in a "
536 "read-only directory.");
542 bool expected =
false;
543 if (!cleanup_in_progress->compare_exchange_strong(expected,
true))
546 "Skipping files cleanup: previous cleanup task is still running");
551 auto* work =
new CleanupWork{
552 .snapshots_dir = snapshots_dir,
553 .max_snapshots = max_snapshots,
554 .ledger_dir = ledger_dir,
555 .read_only_ledger_dirs = read_only_ledger_dirs,
556 .max_committed_ledger_chunks = max_committed_ledger_chunks,
557 .cleanup_in_progress = cleanup_in_progress};
559 auto* req =
new uv_work_t;
561 int rc = uv_queue_work(
562 uv_default_loop(), req, &on_cleanup_work, &on_cleanup_work_done);
565 LOG_FAIL_FMT(
"Failed to queue files cleanup work: {}", uv_strerror(rc));
566 cleanup_in_progress->store(
false);
Definition files_cleanup_timer.h:428
FilesCleanupImpl(const std::string &snapshots_dir_, std::optional< size_t > max_snapshots_, const std::string &ledger_dir_, const std::vector< std::string > &read_only_ledger_dirs_, std::optional< size_t > max_committed_ledger_chunks_)
Definition files_cleanup_timer.h:506
void on_timer()
Definition files_cleanup_timer.h:540
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
void cleanup_old_ledger_chunks(const std::filesystem::path &main_dir, const std::vector< std::filesystem::path > &read_only_dirs, size_t max_retained, std::optional< size_t > snapshot_watermark=std::nullopt)
Definition files_cleanup_timer.h:306
DigestCheckResult check_digest_against_read_only_dirs(const std::filesystem::path &local_path, const std::vector< std::filesystem::path > &read_only_dirs)
Definition files_cleanup_timer.h:121
void cleanup_old_snapshots(const std::vector< std::pair< size_t, std::filesystem::path > > &committed_snapshots, size_t max_retained)
Definition files_cleanup_timer.h:268
std::optional< std::vector< std::pair< size_t, std::filesystem::path > > > find_committed_snapshots(const std::filesystem::path &dir)
Definition files_cleanup_timer.h:232
std::optional< size_t > highest_committed_snapshot_seqno(const std::vector< std::pair< size_t, std::filesystem::path > > &committed_snapshots)
Definition files_cleanup_timer.h:256
std::vector< std::pair< size_t, std::filesystem::path > > find_committed_ledger_chunks(const std::filesystem::path &dir)
Definition files_cleanup_timer.h:42
std::optional< ccf::crypto::Sha256Hash > hash_file(const std::filesystem::path &path)
Definition files_cleanup_timer.h:83
DigestCheckResult
Definition files_cleanup_timer.h:32
std::shared_ptr< ISha256Hash > make_incremental_sha256()
Definition hash.cpp:46
std::vector< std::pair< size_t, fs::path > > find_committed_snapshots_in_directories(const std::vector< fs::path > &directories, std::optional< size_t > minimum_idx=std::nullopt)
Definition filenames.h:167
Definition time_bound_logger.h:14