22 class Snapshotter :
public std::enable_shared_from_this<Snapshotter>,
26 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
31 static constexpr auto max_pending_snapshots_count = 5;
37 std::shared_ptr<ccf::kv::Store> store;
40 size_t snapshot_tx_interval = max_tx_interval;
46 std::string commit_evidence;
48 std::vector<uint8_t> serialised_snapshot;
52 bool is_stored =
false;
54 std::optional<::consensus::Index> evidence_idx = std::nullopt;
56 std::optional<NodeId> node_id = std::nullopt;
57 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
58 std::optional<std::vector<uint8_t>> sig = std::nullopt;
59 std::optional<std::vector<uint8_t>> tree = std::nullopt;
61 SnapshotInfo() =
default;
65 std::map<uint32_t, SnapshotInfo> pending_snapshots;
68 static constexpr ::consensus::Index initial_snapshot_idx = 0;
74 bool snapshot_generation_enabled =
true;
84 std::deque<SnapshotEntry> next_snapshot_indices;
88 const std::vector<uint8_t>& serialised_receipt)
94 ::consensus::snapshot_commit,
102 std::shared_ptr<Snapshotter> self;
103 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
104 uint32_t generation_count;
109 msg->data.self->snapshot_(
110 std::move(msg->data.snapshot), msg->data.generation_count);
114 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
115 uint32_t generation_count)
117 if (pending_snapshots.size() >= max_pending_snapshots_count)
120 "Skipping new snapshot generation as {} snapshots are already "
122 pending_snapshots.size());
126 auto snapshot_version = snapshot->get_version();
128 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
129 auto serialised_snapshot_size = serialised_snapshot.size();
131 auto tx = store->create_tx();
134 evidence->put({snapshot_hash, snapshot_version});
137 cd.
set(std::move(snapshot_hash));
140 std::string commit_evidence;
141 auto capture_ws_digest_and_commit_evidence =
142 [&ws_digest, &commit_evidence](
143 const std::vector<uint8_t>& write_set,
144 const std::string& commit_evidence_) {
147 commit_evidence = commit_evidence_;
155 pending_snapshots[generation_count] = {};
156 pending_snapshots[generation_count].version = snapshot_version;
159 tx.commit(cd,
false,
nullptr, capture_ws_digest_and_commit_evidence);
163 "Could not commit snapshot evidence for seqno {}: {}",
169 auto evidence_version = tx.commit_version();
171 pending_snapshots[generation_count].commit_evidence = commit_evidence;
172 pending_snapshots[generation_count].write_set_digest = ws_digest;
173 pending_snapshots[generation_count].snapshot_digest = cd.
value();
174 pending_snapshots[generation_count].serialised_snapshot =
175 std::move(serialised_snapshot);
179 ::consensus::snapshot_allocate,
183 serialised_snapshot_size,
187 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
188 "seqno {}: {}, ws digest: {}",
189 serialised_snapshot_size,
198 while ((next_snapshot_indices.size() > 1) &&
199 (std::next(next_snapshot_indices.begin())->idx <= idx))
201 next_snapshot_indices.pop_front();
204 for (
auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
206 auto& snapshot_info = it->second;
209 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
210 idx > snapshot_info.evidence_idx.value())
212 auto serialised_receipt = build_and_serialise_receipt(
213 snapshot_info.sig.value(),
214 snapshot_info.tree.value(),
215 snapshot_info.node_id.value(),
216 snapshot_info.node_cert.value(),
217 snapshot_info.evidence_idx.value(),
218 snapshot_info.write_set_digest,
219 snapshot_info.commit_evidence,
220 std::move(snapshot_info.snapshot_digest));
222 commit_snapshot(snapshot_info.version, serialised_receipt);
223 it = pending_snapshots.erase(it);
235 std::shared_ptr<ccf::kv::Store>& store_,
236 size_t snapshot_tx_interval_) :
237 writer_factory(writer_factory_),
239 snapshot_tx_interval(snapshot_tx_interval_)
241 next_snapshot_indices.push_back({initial_snapshot_idx,
false,
true});
249 std::lock_guard<ccf::pal::Mutex> guard(lock);
251 last_snapshot_idx = next_snapshot_indices.back().idx;
256 std::lock_guard<ccf::pal::Mutex> guard(lock);
257 snapshot_generation_enabled = enabled;
263 std::lock_guard<ccf::pal::Mutex> guard(lock);
265 if (last_snapshot_idx != 0)
267 throw std::logic_error(
268 "Last snapshot index can only be set if no snapshot has been "
272 last_snapshot_idx = idx;
274 next_snapshot_indices.clear();
275 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
279 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
283 auto search = pending_snapshots.find(generation_count);
284 if (search == pending_snapshots.end())
287 "Could not find pending snapshot to write for generation count {}",
292 auto& pending_snapshot = search->second;
293 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
299 "Host allocated snapshot buffer [{} bytes] is not of expected "
300 "size [{} bytes]. Discarding snapshot for seqno {}",
302 pending_snapshot.serialised_snapshot.size(),
303 pending_snapshot.version);
304 pending_snapshots.erase(search);
307 else if (!ccf::pal::is_outside_enclave(
308 snapshot_buf.data(), snapshot_buf.size()))
313 "Host allocated snapshot buffer is not outside enclave memory. "
314 "Discarding snapshot for seqno {}",
315 pending_snapshot.version);
316 pending_snapshots.erase(search);
320 ccf::pal::speculation_barrier();
323 pending_snapshot.serialised_snapshot.begin(),
324 pending_snapshot.serialised_snapshot.end(),
325 snapshot_buf.begin());
326 pending_snapshot.is_stored =
true;
329 "Successfully copied snapshot at seqno {} to host memory [{} "
331 pending_snapshot.version,
332 pending_snapshot.serialised_snapshot.size());
340 std::lock_guard<ccf::pal::Mutex> guard(lock);
343 idx >= next_snapshot_indices.back().idx,
344 "Committable seqno {} < next snapshot seqno {}",
346 next_snapshot_indices.back().idx);
348 bool forced = store->flag_enabled_unsafe(
352 for (
auto it = next_snapshot_indices.rbegin();
353 it != next_snapshot_indices.rend();
358 last_unforced_idx = it->idx;
363 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
366 next_snapshot_indices.push_back({idx, !due,
false});
368 "{} {} as snapshot index", !due ?
"Forced" :
"Recorded", idx);
369 store->unset_flag_unsafe(
379 const std::vector<uint8_t>& sig,
383 std::lock_guard<ccf::pal::Mutex> guard(lock);
385 for (
auto& [_, pending_snapshot] : pending_snapshots)
388 pending_snapshot.evidence_idx.has_value() &&
389 idx > pending_snapshot.evidence_idx.value() &&
390 !pending_snapshot.sig.has_value())
393 "Recording signature at {} for snapshot {} with evidence at {}",
395 pending_snapshot.version,
396 pending_snapshot.evidence_idx.value());
398 pending_snapshot.node_id = node_id;
399 pending_snapshot.node_cert = node_cert;
400 pending_snapshot.sig = sig;
408 std::lock_guard<ccf::pal::Mutex> guard(lock);
410 for (
auto& [_, pending_snapshot] : pending_snapshots)
413 pending_snapshot.evidence_idx.has_value() &&
414 idx > pending_snapshot.evidence_idx.value() &&
415 !pending_snapshot.tree.has_value())
418 "Recording serialised tree at {} for snapshot {} with evidence at "
421 pending_snapshot.version,
422 pending_snapshot.evidence_idx.value());
424 pending_snapshot.tree = tree;
432 std::lock_guard<ccf::pal::Mutex> guard(lock);
434 for (
auto& [_, pending_snapshot] : pending_snapshots)
436 if (pending_snapshot.version == snapshot.
version)
439 "Recording evidence idx at {} for snapshot {}",
441 pending_snapshot.version);
443 pending_snapshot.evidence_idx = idx;
450 static uint32_t generation_count = 0;
451 auto msg = std::make_unique<::threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
452 msg->data.self = shared_from_this();
453 msg->data.snapshot = store->snapshot_unsafe_maps(idx);
454 msg->data.generation_count = generation_count++;
457 tm.add_task(tm.get_execution_thread(generation_count), std::move(msg));
468 std::lock_guard<ccf::pal::Mutex> guard(lock);
472 if (idx < last_snapshot_idx)
474 throw std::logic_error(fmt::format(
475 "Cannot snapshot at seqno {} which is earlier than last snapshot "
482 idx >= next_snapshot_indices.front().idx,
483 "Cannot commit snapshotter at {}, which is before last snapshottable "
486 next_snapshot_indices.front().idx);
488 auto& next = next_snapshot_indices.front();
489 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
490 if (due || (next.forced && !next.done))
492 if (snapshot_generation_enabled && generate_snapshot && next.idx)
498 if (due && !next.forced)
503 last_snapshot_idx = next.idx;
505 "Recorded {} as last snapshot index", last_snapshot_idx);
512 std::lock_guard<ccf::pal::Mutex> guard(lock);
514 while (!next_snapshot_indices.empty() &&
515 (next_snapshot_indices.back().idx > idx))
517 next_snapshot_indices.pop_back();
520 if (next_snapshot_indices.empty())
522 next_snapshot_indices.push_back({last_snapshot_idx,
false,
true});
526 "Rolled back snapshotter: last snapshottable idx is now {}",
527 next_snapshot_indices.front().idx);
529 while (!pending_snapshots.empty())
531 const auto& last_snapshot = std::prev(pending_snapshots.end());
533 last_snapshot->second.evidence_idx.has_value() &&
534 idx >= last_snapshot->second.evidence_idx.value())
539 pending_snapshots.erase(last_snapshot);