23 class Snapshotter :
public std::enable_shared_from_this<Snapshotter>,
27 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
32 static constexpr auto max_pending_snapshots_count = 5;
38 std::shared_ptr<ccf::kv::Store> store;
41 size_t snapshot_tx_interval = max_tx_interval;
44 size_t min_snapshot_tx_interval = 0;
47 std::chrono::microseconds snapshot_time_interval =
48 std::chrono::microseconds(0);
50 using Clock = std::chrono::system_clock;
51 using TimePoint = Clock::time_point;
57 std::string commit_evidence;
59 std::vector<uint8_t> serialised_snapshot;
63 bool is_stored =
false;
65 std::optional<::consensus::Index> evidence_idx = std::nullopt;
67 std::optional<std::vector<uint8_t>> cose_sig = std::nullopt;
68 std::optional<std::vector<uint8_t>> tree = std::nullopt;
70 SnapshotInfo() =
default;
74 std::map<uint32_t, SnapshotInfo> pending_snapshots;
77 static constexpr ::consensus::Index initial_snapshot_idx = 0;
83 TimePoint last_snapshot_time = Clock::now();
85 std::map<::consensus::Index, TimePoint> scheduled_snapshot_times;
88 bool snapshot_generation_enabled =
true;
98 std::deque<SnapshotEntry> next_snapshot_indices;
100 static TimePoint time_point_from_snapshot_status(uint64_t timestamp)
102 return TimePoint(std::chrono::duration_cast<TimePoint::duration>(
103 std::chrono::nanoseconds(
static_cast<int64_t
>(timestamp))));
106 static uint64_t snapshot_status_timestamp_from_time_point(
107 const TimePoint& timestamp)
109 const auto timestamp_ns =
110 std::chrono::duration_cast<std::chrono::nanoseconds>(
111 timestamp.time_since_epoch())
116 "Snapshot timestamp {} precedes the Unix epoch",
119 return static_cast<uint64_t
>(timestamp_ns);
124 auto latest_idx = last_snapshot_idx;
125 for (
const auto& entry : next_snapshot_indices)
127 latest_idx = std::max(latest_idx, entry.idx);
133 TimePoint latest_scheduled_or_committed_snapshot_time(
136 auto latest_time = last_snapshot_time;
137 for (
const auto& [idx, time] : scheduled_snapshot_times)
139 if (idx <= latest_snapshot_idx)
141 latest_time = std::max(latest_time, time);
148 void commit_snapshot(
150 const std::vector<uint8_t>& serialised_receipt)
156 ::consensus::snapshot_commit,
164 std::shared_ptr<Snapshotter> self;
165 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
166 uint32_t generation_count;
169 const std::string name;
172 std::shared_ptr<Snapshotter> _self,
173 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot>&& _snapshot,
174 uint32_t _generation_count,
175 TimePoint _timestamp) :
176 self(std::move(_self)),
177 snapshot(std::move(_snapshot)),
178 generation_count(_generation_count),
179 timestamp(_timestamp),
181 "snapshot@{}[{}]", snapshot->get_version(), generation_count))
184 void do_task_implementation()
override
186 self->snapshot_(std::move(snapshot), generation_count, timestamp);
189 [[nodiscard]]
const std::string& get_name()
const override
196 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
197 uint32_t generation_count,
200 auto snapshot_version = snapshot->get_version();
203 std::unique_lock<ccf::pal::Mutex> guard(lock);
204 if (pending_snapshots.size() >= max_pending_snapshots_count)
207 "Skipping new snapshot generation as {} snapshots are already "
209 pending_snapshots.size());
218 pending_snapshots[generation_count].version = snapshot_version;
221 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
222 auto serialised_snapshot_size = serialised_snapshot.size();
224 auto tx = store->create_tx();
227 evidence->put({snapshot_hash, snapshot_version});
230 const auto timestamp_ns =
231 snapshot_status_timestamp_from_time_point(timestamp);
232 status->put({snapshot_version, timestamp_ns});
236 cd.
set(std::move(snapshot_hash));
239 std::string commit_evidence;
240 auto capture_ws_digest_and_commit_evidence =
241 [&ws_digest, &commit_evidence](
243 const std::string& commit_evidence_) {
244 ws_digest = write_set_digest;
245 commit_evidence = commit_evidence_;
248 auto rc = tx.commit(cd,
nullptr, capture_ws_digest_and_commit_evidence);
252 "Could not commit snapshot evidence for seqno {}: {}",
258 auto evidence_version = tx.commit_version();
261 std::unique_lock<ccf::pal::Mutex> guard(lock);
262 pending_snapshots[generation_count].commit_evidence = commit_evidence;
263 pending_snapshots[generation_count].write_set_digest = ws_digest;
264 pending_snapshots[generation_count].snapshot_digest = cd.
value();
265 pending_snapshots[generation_count].serialised_snapshot =
266 std::move(serialised_snapshot);
271 ::consensus::snapshot_allocate,
275 serialised_snapshot_size,
279 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
280 "seqno {}: {}, ws digest: {}",
281 serialised_snapshot_size,
291 while ((next_snapshot_indices.size() > 1) &&
292 (std::next(next_snapshot_indices.begin())->idx <= idx))
294 const auto coalesced_idx = next_snapshot_indices.front().idx;
295 const auto has_pending_snapshot = std::any_of(
296 pending_snapshots.begin(),
297 pending_snapshots.end(),
298 [coalesced_idx](
const auto& entry) {
299 return entry.second.version == coalesced_idx;
307 if (!has_pending_snapshot)
309 scheduled_snapshot_times.erase(coalesced_idx);
311 next_snapshot_indices.pop_front();
315 for (
auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
317 auto& snapshot_info = it->second;
320 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
321 idx > snapshot_info.evidence_idx.value() &&
322 snapshot_info.cose_sig.has_value() && snapshot_info.tree.has_value())
324 auto serialised_receipt = build_and_serialise_receipt(
325 snapshot_info.cose_sig.value(),
326 snapshot_info.tree.value(),
327 snapshot_info.evidence_idx.value(),
328 snapshot_info.write_set_digest,
329 snapshot_info.commit_evidence,
330 std::move(snapshot_info.snapshot_digest));
332 commit_snapshot(snapshot_info.version, serialised_receipt);
333 it = pending_snapshots.erase(it);
345 std::shared_ptr<ccf::kv::Store>& store_,
346 size_t snapshot_tx_interval_,
347 size_t min_snapshot_tx_interval_ = 0,
348 std::chrono::microseconds snapshot_time_interval_ =
349 std::chrono::microseconds(0)) :
350 writer_factory(writer_factory_),
352 snapshot_tx_interval(snapshot_tx_interval_),
353 min_snapshot_tx_interval(min_snapshot_tx_interval_),
354 snapshot_time_interval(snapshot_time_interval_)
356 next_snapshot_indices.push_back({initial_snapshot_idx,
false,
true});
364 std::lock_guard<ccf::pal::Mutex> guard(lock);
366 last_snapshot_idx = next_snapshot_indices.back().idx;
367 last_snapshot_time = Clock::now();
372 std::lock_guard<ccf::pal::Mutex> guard(lock);
373 snapshot_generation_enabled = enabled;
378 std::lock_guard<ccf::pal::Mutex> guard(lock);
380 const auto timestamp = time_point_from_snapshot_status(status.
timestamp);
381 last_snapshot_idx = status.
version;
382 last_snapshot_time = timestamp;
384 next_snapshot_indices.clear();
385 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
389 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
391 std::lock_guard<ccf::pal::Mutex> guard(lock);
393 auto search = pending_snapshots.find(generation_count);
394 if (search == pending_snapshots.end())
397 "Could not find pending snapshot to write for generation count {}",
402 auto& pending_snapshot = search->second;
403 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
409 "Host allocated snapshot buffer [{} bytes] is not of expected "
410 "size [{} bytes]. Discarding snapshot for seqno {}",
412 pending_snapshot.serialised_snapshot.size(),
413 pending_snapshot.version);
414 pending_snapshots.erase(search);
419 pending_snapshot.serialised_snapshot.begin(),
420 pending_snapshot.serialised_snapshot.end(),
421 snapshot_buf.begin());
422 pending_snapshot.is_stored =
true;
425 "Successfully copied snapshot at seqno {} to host memory [{} "
427 pending_snapshot.version,
428 pending_snapshot.serialised_snapshot.size());
434 auto latest_snapshot_idx = latest_scheduled_or_committed_snapshot_idx();
438 auto count = threshold_idx - latest_snapshot_idx;
439 auto count_overdue = count >= snapshot_tx_interval;
441 auto latest_scheduled_or_committed_time =
442 latest_scheduled_or_committed_snapshot_time(latest_snapshot_idx);
443 auto time_enabled = snapshot_time_interval.count() > 0;
444 auto min_count_met = count > min_snapshot_tx_interval;
445 const auto now = Clock::now();
446 auto time_overdue = time_enabled && min_count_met &&
447 (now - latest_scheduled_or_committed_time >= snapshot_time_interval);
449 if (count_overdue || time_overdue)
452 "Snapshot at seqno {} is due (c: {}, t: {}): count since last "
453 "queued snapshot is {}, time since last queued snapshot is {}s",
455 count_overdue ?
"overdue" :
"not overdue",
456 time_overdue ?
"overdue" :
"not overdue",
458 std::chrono::duration_cast<std::chrono::seconds>(
459 now - latest_scheduled_or_committed_time)
463 return count_overdue || time_overdue;
468 std::lock_guard<ccf::pal::Mutex> guard(lock);
476 std::lock_guard<ccf::pal::Mutex> guard(lock);
479 idx >= next_snapshot_indices.back().idx,
480 "Committable seqno {} < next snapshot seqno {}",
482 next_snapshot_indices.back().idx);
484 bool forced = store->flag_enabled_unsafe(
491 auto actually_forced = !due && forced;
492 next_snapshot_indices.push_back({idx, actually_forced,
false});
493 scheduled_snapshot_times[idx] = Clock::now();
495 "{} {} as snapshot index",
496 actually_forced ?
"Forced" :
"Recorded",
498 store->unset_flag_unsafe(
509 std::lock_guard<ccf::pal::Mutex> guard(lock);
511 for (
auto& [_, pending_snapshot] : pending_snapshots)
514 pending_snapshot.evidence_idx.has_value() &&
515 idx > pending_snapshot.evidence_idx.value() &&
516 !pending_snapshot.cose_sig.has_value())
519 "Recording COSE signature at {} for snapshot {} with evidence at "
522 pending_snapshot.version,
523 pending_snapshot.evidence_idx.value());
525 pending_snapshot.cose_sig = cose_sig;
533 std::lock_guard<ccf::pal::Mutex> guard(lock);
535 for (
auto& [_, pending_snapshot] : pending_snapshots)
538 pending_snapshot.evidence_idx.has_value() &&
539 idx > pending_snapshot.evidence_idx.value() &&
540 !pending_snapshot.tree.has_value())
543 "Recording serialised tree at {} for snapshot {} with evidence at "
546 pending_snapshot.version,
547 pending_snapshot.evidence_idx.value());
549 pending_snapshot.tree = tree;
557 std::lock_guard<ccf::pal::Mutex> guard(lock);
559 for (
auto& [_, pending_snapshot] : pending_snapshots)
561 if (pending_snapshot.version == snapshot.
version)
564 "Recording evidence idx at {} for snapshot {}",
566 pending_snapshot.version);
568 pending_snapshot.evidence_idx = idx;
577 std::lock_guard<ccf::pal::Mutex> guard(lock);
579 const auto timestamp = time_point_from_snapshot_status(status.
timestamp);
580 last_snapshot_idx = status.
version;
581 last_snapshot_time = timestamp;
586 std::erase_if(scheduled_snapshot_times, [&status](
const auto& entry) {
587 return entry.first <= status.
version;
593 static uint32_t generation_count = 0;
595 auto task = std::make_shared<SnapshotTask>(
597 store->snapshot_unsafe_maps(idx),
612 std::lock_guard<ccf::pal::Mutex> guard(lock);
619 if (idx < last_snapshot_idx)
621 throw std::logic_error(fmt::format(
622 "Cannot snapshot at seqno {} which is earlier than last snapshot "
629 idx >= next_snapshot_indices.front().idx,
630 "Cannot commit snapshotter at {}, which is before last snapshottable "
633 next_snapshot_indices.front().idx);
635 auto& next = next_snapshot_indices.front();
639 snapshot_generation_enabled && generate_snapshot && (next.idx != 0u))
641 auto snapshot_time = scheduled_snapshot_times.find(next.idx);
642 if (snapshot_time == scheduled_snapshot_times.end())
644 const auto timestamp = Clock::now();
646 "Could not find scheduled snapshot time for idx {}", next.idx);
647 scheduled_snapshot_times[next.idx] = timestamp;
658 if (last_snapshot_idx != next.idx)
663 last_snapshot_idx = next.idx;
664 LOG_TRACE_FMT(
"Recorded {} as last snapshot index", last_snapshot_idx);
670 std::lock_guard<ccf::pal::Mutex> guard(lock);
672 while (!next_snapshot_indices.empty() &&
673 (next_snapshot_indices.back().idx > idx))
675 next_snapshot_indices.pop_back();
678 std::erase_if(scheduled_snapshot_times, [idx](
const auto& entry) {
679 return entry.first > idx;
682 if (next_snapshot_indices.empty())
684 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
688 "Rolled back snapshotter: last snapshottable idx is now {}",
689 next_snapshot_indices.front().idx);
691 while (!pending_snapshots.empty())
693 const auto& last_snapshot = std::prev(pending_snapshots.end());
694 if (
auto evidence_opt = last_snapshot->second.evidence_idx;
695 evidence_opt.has_value() && idx >= evidence_opt.value())
700 pending_snapshots.erase(last_snapshot);