CCF
Loading...
Searching...
No Matches
files_cleanup_timer.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
7#include "ledger_filenames.h"
9#include "time_bound_logger.h"
10#include "timer.h"
11
12#include <algorithm>
13#include <atomic>
14#include <cstdint>
15#include <filesystem>
16#include <fstream>
17#include <memory>
18#include <optional>
19#include <string>
20#include <system_error>
21#include <utility>
22#include <vector>
23
24namespace asynchost
25{
26 // Pure helper functions for file cleanup, extracted for testability.
27 namespace files_cleanup
28 {
29 // Return type for check_digest_against_read_only_dirs(), distinguishing
30 // between a verified digest match, no match, and concurrent deletion.
31 enum class DigestCheckResult : std::uint8_t
32 {
33 match_found, // An identical copy exists in a read-only directory
34 no_match, // File exists locally but no matching copy was found
35 file_gone // Local file was concurrently deleted (benign)
36 };
37 static constexpr size_t HASH_READ_CHUNK_SIZE = size_t{64} * 1024; // 64 KB
38
39 // Returns committed ledger chunks in the given directory, sorted ascending
40 // by start index. Each entry is (start_idx, path).
41 inline std::vector<std::pair<size_t, std::filesystem::path>>
42 find_committed_ledger_chunks(const std::filesystem::path& dir)
43 {
44 namespace fs = std::filesystem;
45 std::vector<std::pair<size_t, fs::path>> result;
46
47 for (const auto& entry : fs::directory_iterator(dir))
48 {
49 if (!entry.is_regular_file())
50 {
51 continue;
52 }
53
54 auto file_name = entry.path().filename().string();
55
56 if (!is_ledger_file_name_committed(file_name))
57 {
58 continue;
59 }
60
61 try
62 {
63 auto start_idx = get_start_idx_from_file_name(file_name);
64 result.emplace_back(start_idx, entry.path());
65 }
66 catch (const std::exception& e)
67 {
69 "Skipping ledger file {} during cleanup: {}", file_name, e.what());
70 }
71 }
72
73 // Sort ascending by start index (oldest first)
74 std::sort(result.begin(), result.end(), [](const auto& a, const auto& b) {
75 return a.first < b.first;
76 });
77
78 return result;
79 }
80
81 // Compute SHA-256 digest of a file by reading it in chunks, without
82 // loading the entire file into memory.
83 inline std::optional<ccf::crypto::Sha256Hash> hash_file(
84 const std::filesystem::path& path)
85 {
86 std::ifstream f;
87 {
88 TimeBoundLogger log_if_slow(
89 fmt::format("Hashing file - ifstream open({})", path));
90 f.open(path, std::ios::binary);
91 }
92 if (!f)
93 {
94 return std::nullopt;
95 }
96
98 std::vector<uint8_t> buf(HASH_READ_CHUNK_SIZE);
99 {
100 TimeBoundLogger log_if_slow(
101 fmt::format("Hashing file - read loop({})", path));
102 while (f.read(reinterpret_cast<char*>(buf.data()), buf.size()) ||
103 f.gcount() > 0)
104 {
105 hasher->update_hash({buf.data(), static_cast<size_t>(f.gcount())});
106 if (f.eof())
107 {
108 break;
109 }
110 }
111 }
112
113 if (f.bad())
114 {
115 return std::nullopt;
116 }
117
118 return hasher->finalise();
119 }
120
122 const std::filesystem::path& local_path,
123 const std::vector<std::filesystem::path>& read_only_dirs)
124 {
125 namespace fs = std::filesystem;
126
127 auto local_hash = hash_file(local_path);
128 if (!local_hash.has_value())
129 {
130 // Distinguish between a concurrent deletion (benign) and a genuine
131 // read error on an existing file. Use non-throwing overloads to
132 // avoid exceptions from permission issues or broken mounts.
133 std::error_code ec;
134 const auto exists = fs::exists(local_path, ec);
135 if (ec)
136 {
138 "Failed to query existence of ledger chunk {}: {}. "
139 "Skipping deletion.",
140 local_path.filename(),
141 ec.message());
143 }
144 if (!exists)
145 {
147 "Ledger chunk {} no longer exists, skipping",
148 local_path.filename());
150 }
151
152 ec.clear();
153 const auto is_reg = fs::is_regular_file(local_path, ec);
154 if (ec)
155 {
157 "Failed to query type of ledger chunk {}: {}. "
158 "Skipping deletion.",
159 local_path.filename(),
160 ec.message());
162 }
163 if (!is_reg)
164 {
166 "Ledger chunk {} is no longer a regular file, skipping",
167 local_path.filename());
169 }
170
172 "Ledger chunk {} exists but could not be read, skipping deletion",
173 local_path.filename());
175 }
176
177 auto file_name = local_path.filename();
178
179 for (const auto& ro_dir : read_only_dirs)
180 {
181 auto candidate = ro_dir / file_name;
182 std::error_code ec;
183 if (
184 !fs::exists(candidate, ec) || ec ||
185 !fs::is_regular_file(candidate, ec) || ec)
186 {
187 continue;
188 }
189
190 try
191 {
192 auto ro_hash = hash_file(candidate);
193 if (!ro_hash.has_value())
194 {
196 "Ledger chunk {} in read-only directory {} could not be read",
197 file_name,
198 ro_dir);
199 continue;
200 }
201 if (local_hash.value() == ro_hash.value())
202 {
204 }
205
207 "Ledger chunk {} found in read-only directory {} but digest "
208 "does not match (local: {}, read-only: {}). Skipping deletion.",
209 file_name,
210 ro_dir,
211 local_hash.value().hex_str(),
212 ro_hash.value().hex_str());
213 }
214 catch (const std::exception& e)
215 {
217 "Failed to read ledger chunk {} from read-only directory {}: "
218 "{}. Skipping deletion.",
219 file_name,
220 ro_dir,
221 e.what());
222 }
223 }
224
226 }
227
228 // Lists committed snapshots in the given directory. Returns them sorted
229 // descending by snapshot index (newest first). Returns nullopt on error
230 // to allow callers to distinguish "no snapshots" from "listing failed".
231 inline std::optional<std::vector<std::pair<size_t, std::filesystem::path>>>
232 find_committed_snapshots(const std::filesystem::path& dir)
233 {
234 std::vector<std::filesystem::path> directories{dir};
235 try
236 {
238 }
239 catch (const std::filesystem::filesystem_error& e)
240 {
242 "Failed to list committed snapshots in {}: {}", dir, e.what());
243 }
244 catch (const std::exception& e)
245 {
247 "Unexpected error while listing committed snapshots in {}: {}",
248 dir,
249 e.what());
250 }
251 return std::nullopt;
252 }
253
254 // Returns the sequence number of the newest committed snapshot from a
255 // pre-gathered list, or nullopt if the list is empty.
256 inline std::optional<size_t> highest_committed_snapshot_seqno(
257 const std::vector<std::pair<size_t, std::filesystem::path>>&
258 committed_snapshots)
259 {
260 if (!committed_snapshots.empty())
261 {
262 // Sorted descending by snapshot index; first is newest
263 return committed_snapshots.front().first;
264 }
265 return std::nullopt;
266 }
267
269 const std::vector<std::pair<size_t, std::filesystem::path>>&
270 committed_snapshots,
271 size_t max_retained)
272 {
273 TimeBoundLogger log_if_slow(
274 "Cleaning snapshots", std::chrono::seconds(1));
275
276 if (committed_snapshots.size() > max_retained)
277 {
278 // committed_snapshots is sorted descending by snapshot index, so the
279 // oldest are at the end
280 for (auto it = committed_snapshots.rbegin();
281 it != committed_snapshots.rend() - max_retained;
282 ++it)
283 {
284 const auto& path = it->second;
286 "Deleting old snapshot {} (retaining {})",
287 path.filename(),
288 max_retained);
289 std::error_code ec;
290 {
291 TimeBoundLogger log_remove_if_slow(fmt::format(
292 "Deleting old snapshot - remove({})", path.filename()));
293 std::filesystem::remove(path, ec);
294 }
295 if (ec)
296 {
298 "Failed to delete old snapshot {}: {}",
299 path.filename(),
300 ec.message());
301 }
302 }
303 }
304 }
305
307 const std::filesystem::path& main_dir,
308 const std::vector<std::filesystem::path>& read_only_dirs,
309 size_t max_retained,
310 std::optional<size_t> snapshot_watermark = std::nullopt)
311 {
312 TimeBoundLogger log_if_slow(
313 fmt::format(
314 "Cleaning ledger chunks from {}, watermark={}",
315 main_dir,
316 snapshot_watermark.has_value() ?
317 std::to_string(snapshot_watermark.value()) :
318 "none"),
319 std::chrono::seconds(1));
320
321 std::vector<std::pair<size_t, std::filesystem::path>> committed;
322 try
323 {
324 committed = find_committed_ledger_chunks(main_dir);
325 }
326 catch (const std::filesystem::filesystem_error& e)
327 {
329 "Failed to list committed ledger chunks in {}: {}",
330 main_dir,
331 e.what());
332 return;
333 }
334 catch (const std::exception& e)
335 {
337 "Unexpected error while listing committed ledger chunks in {}: {}",
338 main_dir,
339 e.what());
340 return;
341 }
342
343 if (committed.size() <= max_retained)
344 {
345 return;
346 }
347
348 if (snapshot_watermark.has_value())
349 {
351 "Ledger chunk cleanup: snapshot watermark is {}",
352 snapshot_watermark.value());
353 }
354
355 // committed is sorted ascending by start index; the oldest are at the
356 // front. Delete from front, keeping the last max_retained entries.
357 size_t to_delete = committed.size() - max_retained;
358 for (size_t i = 0; i < to_delete; ++i)
359 {
360 const auto& path = committed[i].second;
361
362 // Never delete a chunk that ends at or after the newest committed
363 // snapshot - we must preserve a complete ledger from that snapshot
364 // onwards for disaster recovery.
365 if (snapshot_watermark.has_value())
366 {
367 auto end_idx = get_last_idx_from_file_name(path.filename().string());
368 if (
369 end_idx.has_value() &&
370 end_idx.value() >= snapshot_watermark.value())
371 {
373 "Keeping ledger chunk {} (end seqno {} >= snapshot "
374 "watermark {})",
375 path.filename(),
376 end_idx.value(),
377 snapshot_watermark.value());
378 continue;
379 }
380 }
381
382 auto digest_result =
383 check_digest_against_read_only_dirs(path, read_only_dirs);
384 if (digest_result == DigestCheckResult::file_gone)
385 {
386 // File was concurrently deleted — nothing to do.
387 continue;
388 }
389 if (digest_result == DigestCheckResult::no_match)
390 {
392 "Keeping ledger chunk {} because no matching copy was found "
393 "in any read-only ledger directory",
394 path.filename());
395 continue;
396 }
397
399 "Deleting old committed ledger chunk {} (retaining {})",
400 path.filename(),
401 max_retained);
402 std::error_code ec;
403 {
404 TimeBoundLogger log_remove_if_slow(fmt::format(
405 "Deleting old ledger chunk - remove({})", path.filename()));
406 std::filesystem::remove(path, ec);
407 }
408 if (ec)
409 {
410 if (ec == std::errc::no_such_file_or_directory)
411 {
413 "Ledger chunk {} was already removed", path.filename());
414 }
415 else
416 {
418 "Failed to delete committed ledger chunk {}: {}",
419 path.filename(),
420 ec.message());
421 }
422 }
423 }
424 }
425 } // namespace files_cleanup
426
428 {
429 private:
430 // Snapshot cleanup config
431 std::filesystem::path snapshots_dir;
432 std::optional<size_t> max_snapshots;
433
434 // Ledger chunk cleanup config
435 std::filesystem::path ledger_dir;
436 std::vector<std::filesystem::path> read_only_ledger_dirs;
437 std::optional<size_t> max_committed_ledger_chunks;
438
439 // Guard against overlapping cleanup tasks. Shared between the impl and
440 // any in-flight CleanupWork so the flag remains valid even if the timer
441 // is destroyed while a cleanup task is still running on the thread pool.
442 std::shared_ptr<std::atomic<bool>> cleanup_in_progress =
443 std::make_shared<std::atomic<bool>>(false);
444
445 struct CleanupWork
446 {
447 std::filesystem::path snapshots_dir;
448 std::optional<size_t> max_snapshots;
449
450 std::filesystem::path ledger_dir;
451 std::vector<std::filesystem::path> read_only_ledger_dirs;
452 std::optional<size_t> max_committed_ledger_chunks;
453
454 std::shared_ptr<std::atomic<bool>> cleanup_in_progress;
455 };
456
457 static void on_cleanup_work(uv_work_t* req)
458 {
459 auto* work = static_cast<CleanupWork*>(req->data);
460 LOG_DEBUG_FMT("Files cleanup started");
461
462 // Gather committed snapshots once - used by both snapshot cleanup
463 // and as a watermark for ledger chunk cleanup.
464 auto committed_snapshots_opt =
465 files_cleanup::find_committed_snapshots(work->snapshots_dir);
466
467 if (!committed_snapshots_opt.has_value())
468 {
469 // Snapshot listing failed. Skip both snapshot and ledger cleanup
470 // to avoid deleting ledger chunks without a valid watermark.
472 "Skipping all file cleanup because committed snapshot listing "
473 "failed");
474 return;
475 }
476
477 auto& committed_snapshots = committed_snapshots_opt.value();
478
479 if (work->max_snapshots.has_value())
480 {
482 committed_snapshots, work->max_snapshots.value());
483 }
484 if (work->max_committed_ledger_chunks.has_value())
485 {
486 auto snapshot_watermark =
489 work->ledger_dir,
490 work->read_only_ledger_dirs,
491 work->max_committed_ledger_chunks.value(),
492 snapshot_watermark);
493 }
494 }
495
496 static void on_cleanup_work_done(uv_work_t* req, int /*status*/)
497 {
498 auto* work = static_cast<CleanupWork*>(req->data);
499 work->cleanup_in_progress->store(false);
500 LOG_DEBUG_FMT("Files cleanup completed");
501 delete work; // NOLINT(cppcoreguidelines-owning-memory)
502 delete req; // NOLINT(cppcoreguidelines-owning-memory)
503 }
504
505 public:
507 const std::string& snapshots_dir_,
508 std::optional<size_t> max_snapshots_,
509 const std::string& ledger_dir_,
510 const std::vector<std::string>& read_only_ledger_dirs_,
511 std::optional<size_t> max_committed_ledger_chunks_) :
512 snapshots_dir(snapshots_dir_),
513 max_snapshots(max_snapshots_),
514 ledger_dir(ledger_dir_),
515 max_committed_ledger_chunks(max_committed_ledger_chunks_)
516 {
517 for (const auto& d : read_only_ledger_dirs_)
518 {
519 read_only_ledger_dirs.emplace_back(d);
520 }
521
522 if (max_snapshots.has_value() && max_snapshots.value() < 1)
523 {
524 throw std::logic_error(fmt::format(
525 "files_cleanup.max_snapshots must be at least 1, got {}",
526 max_snapshots.value()));
527 }
528 if (
529 max_committed_ledger_chunks.has_value() &&
530 read_only_ledger_dirs.empty())
531 {
532 throw std::logic_error(
533 "files_cleanup.max_committed_ledger_chunks requires at least one "
534 "ledger.read_only_directories entry. Committed ledger chunks are "
535 "only deleted after verifying an identical copy exists in a "
536 "read-only directory.");
537 }
538 }
539
540 void on_timer()
541 {
542 bool expected = false;
543 if (!cleanup_in_progress->compare_exchange_strong(expected, true))
544 {
546 "Skipping files cleanup: previous cleanup task is still running");
547 return;
548 }
549
550 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
551 auto* work = new CleanupWork{
552 .snapshots_dir = snapshots_dir,
553 .max_snapshots = max_snapshots,
554 .ledger_dir = ledger_dir,
555 .read_only_ledger_dirs = read_only_ledger_dirs,
556 .max_committed_ledger_chunks = max_committed_ledger_chunks,
557 .cleanup_in_progress = cleanup_in_progress};
558 // NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
559 auto* req = new uv_work_t;
560 req->data = work;
561 int rc = uv_queue_work(
562 uv_default_loop(), req, &on_cleanup_work, &on_cleanup_work_done);
563 if (rc < 0)
564 {
565 LOG_FAIL_FMT("Failed to queue files cleanup work: {}", uv_strerror(rc));
566 cleanup_in_progress->store(false);
567 delete work; // NOLINT(cppcoreguidelines-owning-memory)
568 delete req; // NOLINT(cppcoreguidelines-owning-memory)
569 }
570 }
571 };
572
574}
Definition files_cleanup_timer.h:428
FilesCleanupImpl(const std::string &snapshots_dir_, std::optional< size_t > max_snapshots_, const std::string &ledger_dir_, const std::vector< std::string > &read_only_ledger_dirs_, std::optional< size_t > max_committed_ledger_chunks_)
Definition files_cleanup_timer.h:506
void on_timer()
Definition files_cleanup_timer.h:540
Definition proxy.h:51
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
void cleanup_old_ledger_chunks(const std::filesystem::path &main_dir, const std::vector< std::filesystem::path > &read_only_dirs, size_t max_retained, std::optional< size_t > snapshot_watermark=std::nullopt)
Definition files_cleanup_timer.h:306
DigestCheckResult check_digest_against_read_only_dirs(const std::filesystem::path &local_path, const std::vector< std::filesystem::path > &read_only_dirs)
Definition files_cleanup_timer.h:121
void cleanup_old_snapshots(const std::vector< std::pair< size_t, std::filesystem::path > > &committed_snapshots, size_t max_retained)
Definition files_cleanup_timer.h:268
std::optional< std::vector< std::pair< size_t, std::filesystem::path > > > find_committed_snapshots(const std::filesystem::path &dir)
Definition files_cleanup_timer.h:232
std::optional< size_t > highest_committed_snapshot_seqno(const std::vector< std::pair< size_t, std::filesystem::path > > &committed_snapshots)
Definition files_cleanup_timer.h:256
std::vector< std::pair< size_t, std::filesystem::path > > find_committed_ledger_chunks(const std::filesystem::path &dir)
Definition files_cleanup_timer.h:42
std::optional< ccf::crypto::Sha256Hash > hash_file(const std::filesystem::path &path)
Definition files_cleanup_timer.h:83
DigestCheckResult
Definition files_cleanup_timer.h:32
Definition after_io.h:8
std::shared_ptr< ISha256Hash > make_incremental_sha256()
Definition hash.cpp:46
std::vector< std::pair< size_t, fs::path > > find_committed_snapshots_in_directories(const std::vector< fs::path > &directories, std::optional< size_t > minimum_idx=std::nullopt)
Definition filenames.h:167
Definition time_bound_logger.h:14