CCF
Loading...
Searching...
No Matches
snapshotter.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/ccf_assert.h"
6#include "ccf/ds/logger.h"
7#include "ccf/pal/enclave.h"
8#include "ccf/pal/locking.h"
10#include "ds/thread_messaging.h"
11#include "kv/kv_types.h"
12#include "kv/store.h"
13#include "node/network_state.h"
16
17#include <deque>
18#include <optional>
19
20namespace ccf
21{
22 class Snapshotter : public std::enable_shared_from_this<Snapshotter>,
24 {
25 private:
26 static constexpr auto max_tx_interval = std::numeric_limits<size_t>::max();
27
28 // Maximum number of pending snapshots allowed at a given time. No more
29 // snapshots are emitted when this threshold is reached and until pending
30 // snapshots are flushed on commit.
31 static constexpr auto max_pending_snapshots_count = 5;
32
34
35 ccf::pal::Mutex lock;
36
37 std::shared_ptr<ccf::kv::Store> store;
38
39 // Snapshots are never generated by default (e.g. during public recovery)
40 size_t snapshot_tx_interval = max_tx_interval;
41
42 struct SnapshotInfo
43 {
44 ccf::kv::Version version;
45 ccf::crypto::Sha256Hash write_set_digest;
46 std::string commit_evidence;
47 ccf::crypto::Sha256Hash snapshot_digest;
48 std::vector<uint8_t> serialised_snapshot;
49
50 // Prevents the receipt from being passed to the host (on commit) in case
51 // host has not yet allocated memory for the snapshot.
52 bool is_stored = false;
53
54 std::optional<::consensus::Index> evidence_idx = std::nullopt;
55
56 std::optional<NodeId> node_id = std::nullopt;
57 std::optional<ccf::crypto::Pem> node_cert = std::nullopt;
58 std::optional<std::vector<uint8_t>> sig = std::nullopt;
59 std::optional<std::vector<uint8_t>> tree = std::nullopt;
60
61 SnapshotInfo() = default;
62 };
63 // Queue of pending snapshots that have been generated, but are not yet
64 // committed
65 std::map<uint32_t, SnapshotInfo> pending_snapshots;
66
67 // Initial snapshot index
68 static constexpr ::consensus::Index initial_snapshot_idx = 0;
69
70 // Index at which the latest snapshot was generated
71 ::consensus::Index last_snapshot_idx = 0;
72
73 // Used to suspend snapshot generation during public recovery
74 bool snapshot_generation_enabled = true;
75
76 // Indices at which a snapshot will be next generated and Boolean to
77 // indicate whether a snapshot was forced at the given index
78 struct SnapshotEntry
79 {
81 bool forced;
82 bool done;
83 };
84 std::deque<SnapshotEntry> next_snapshot_indices;
85
86 void commit_snapshot(
87 ::consensus::Index snapshot_idx,
88 const std::vector<uint8_t>& serialised_receipt)
89 {
90 // The snapshot_idx is used to retrieve the correct snapshot file
91 // previously generated.
92 auto to_host = writer_factory.create_writer_to_outside();
94 ::consensus::snapshot_commit,
95 to_host,
96 snapshot_idx,
97 serialised_receipt);
98 }
99
100 struct SnapshotMsg
101 {
102 std::shared_ptr<Snapshotter> self;
103 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot;
104 uint32_t generation_count;
105 };
106
107 static void snapshot_cb(std::unique_ptr<::threading::Tmsg<SnapshotMsg>> msg)
108 {
109 msg->data.self->snapshot_(
110 std::move(msg->data.snapshot), msg->data.generation_count);
111 }
112
113 void snapshot_(
114 std::unique_ptr<ccf::kv::AbstractStore::AbstractSnapshot> snapshot,
115 uint32_t generation_count)
116 {
117 if (pending_snapshots.size() >= max_pending_snapshots_count)
118 {
120 "Skipping new snapshot generation as {} snapshots are already "
121 "pending",
122 pending_snapshots.size());
123 return;
124 }
125
126 auto snapshot_version = snapshot->get_version();
127
128 auto serialised_snapshot = store->serialise_snapshot(std::move(snapshot));
129 auto serialised_snapshot_size = serialised_snapshot.size();
130
131 auto tx = store->create_tx();
132 auto evidence = tx.rw<SnapshotEvidence>(Tables::SNAPSHOT_EVIDENCE);
133 auto snapshot_hash = ccf::crypto::Sha256Hash(serialised_snapshot);
134 evidence->put({snapshot_hash, snapshot_version});
135
137 cd.set(std::move(snapshot_hash));
138
139 ccf::crypto::Sha256Hash ws_digest;
140 std::string commit_evidence;
141 auto capture_ws_digest_and_commit_evidence =
142 [&ws_digest, &commit_evidence](
143 const std::vector<uint8_t>& write_set,
144 const std::string& commit_evidence_) {
145 new (&ws_digest)
146 ccf::crypto::Sha256Hash({write_set.data(), write_set.size()});
147 commit_evidence = commit_evidence_;
148 };
149
150 // It is possible that the signature following the snapshot evidence is
151 // scheduled by another thread while the below snapshot evidence
152 // transaction is committed. To allow for such scenario, the evidence
153 // seqno is recorded via `record_snapshot_evidence_idx()` on a hook rather
154 // than here.
155 pending_snapshots[generation_count] = {};
156 pending_snapshots[generation_count].version = snapshot_version;
157
158 auto rc =
159 tx.commit(cd, false, nullptr, capture_ws_digest_and_commit_evidence);
161 {
163 "Could not commit snapshot evidence for seqno {}: {}",
164 snapshot_version,
165 rc);
166 return;
167 }
168
169 auto evidence_version = tx.commit_version();
170
171 pending_snapshots[generation_count].commit_evidence = commit_evidence;
172 pending_snapshots[generation_count].write_set_digest = ws_digest;
173 pending_snapshots[generation_count].snapshot_digest = cd.value();
174 pending_snapshots[generation_count].serialised_snapshot =
175 std::move(serialised_snapshot);
176
177 auto to_host = writer_factory.create_writer_to_outside();
179 ::consensus::snapshot_allocate,
180 to_host,
181 snapshot_version,
182 evidence_version,
183 serialised_snapshot_size,
184 generation_count);
185
187 "Request to allocate snapshot [{} bytes] for seqno {}, with evidence "
188 "seqno {}: {}, ws digest: {}",
189 serialised_snapshot_size,
190 snapshot_version,
191 evidence_version,
192 cd.value(),
193 ws_digest);
194 }
195
196 void update_indices(::consensus::Index idx)
197 {
198 while ((next_snapshot_indices.size() > 1) &&
199 (std::next(next_snapshot_indices.begin())->idx <= idx))
200 {
201 next_snapshot_indices.pop_front();
202 }
203
204 for (auto it = pending_snapshots.begin(); it != pending_snapshots.end();)
205 {
206 auto& snapshot_info = it->second;
207
208 if (
209 snapshot_info.is_stored && snapshot_info.evidence_idx.has_value() &&
210 idx > snapshot_info.evidence_idx.value())
211 {
212 auto serialised_receipt = build_and_serialise_receipt(
213 snapshot_info.sig.value(),
214 snapshot_info.tree.value(),
215 snapshot_info.node_id.value(),
216 snapshot_info.node_cert.value(),
217 snapshot_info.evidence_idx.value(),
218 snapshot_info.write_set_digest,
219 snapshot_info.commit_evidence,
220 std::move(snapshot_info.snapshot_digest));
221
222 commit_snapshot(snapshot_info.version, serialised_receipt);
223 it = pending_snapshots.erase(it);
224 }
225 else
226 {
227 ++it;
228 }
229 }
230 }
231
232 public:
234 ringbuffer::AbstractWriterFactory& writer_factory_,
235 std::shared_ptr<ccf::kv::Store>& store_,
236 size_t snapshot_tx_interval_) :
237 writer_factory(writer_factory_),
238 store(store_),
239 snapshot_tx_interval(snapshot_tx_interval_)
240 {
241 next_snapshot_indices.push_back({initial_snapshot_idx, false, true});
242 }
243
245 {
246 // After public recovery, the first node should have restored all
247 // snapshot indices in next_snapshot_indices so that snapshot
248 // generation can continue at the correct interval
249 std::lock_guard<ccf::pal::Mutex> guard(lock);
250
251 last_snapshot_idx = next_snapshot_indices.back().idx;
252 }
253
254 void set_snapshot_generation(bool enabled)
255 {
256 std::lock_guard<ccf::pal::Mutex> guard(lock);
257 snapshot_generation_enabled = enabled;
258 }
259
261 {
262 // Should only be called once, after a snapshot has been applied
263 std::lock_guard<ccf::pal::Mutex> guard(lock);
264
265 if (last_snapshot_idx != 0)
266 {
267 throw std::logic_error(
268 "Last snapshot index can only be set if no snapshot has been "
269 "generated");
270 }
271
272 last_snapshot_idx = idx;
273
274 next_snapshot_indices.clear();
275 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
276 }
277
279 std::span<uint8_t> snapshot_buf, uint32_t generation_count)
280 {
281 std::lock_guard<ccf::pal::Mutex> guard(lock);
282
283 auto search = pending_snapshots.find(generation_count);
284 if (search == pending_snapshots.end())
285 {
287 "Could not find pending snapshot to write for generation count {}",
288 generation_count);
289 return false;
290 }
291
292 auto& pending_snapshot = search->second;
293 if (snapshot_buf.size() != pending_snapshot.serialised_snapshot.size())
294 {
295 // Unreliable host: allocated snapshot buffer is not of expected
296 // size. The pending snapshot is discarded to reduce enclave memory
297 // usage.
299 "Host allocated snapshot buffer [{} bytes] is not of expected "
300 "size [{} bytes]. Discarding snapshot for seqno {}",
301 snapshot_buf.size(),
302 pending_snapshot.serialised_snapshot.size(),
303 pending_snapshot.version);
304 pending_snapshots.erase(search);
305 return false;
306 }
307 else if (!ccf::pal::is_outside_enclave(
308 snapshot_buf.data(), snapshot_buf.size()))
309 {
310 // Sanitise host-allocated buffer. Note that buffer alignment is not
311 // checked as the buffer is only written to and never read.
313 "Host allocated snapshot buffer is not outside enclave memory. "
314 "Discarding snapshot for seqno {}",
315 pending_snapshot.version);
316 pending_snapshots.erase(search);
317 return false;
318 }
319
320 ccf::pal::speculation_barrier();
321
322 std::copy(
323 pending_snapshot.serialised_snapshot.begin(),
324 pending_snapshot.serialised_snapshot.end(),
325 snapshot_buf.begin());
326 pending_snapshot.is_stored = true;
327
329 "Successfully copied snapshot at seqno {} to host memory [{} "
330 "bytes]",
331 pending_snapshot.version,
332 pending_snapshot.serialised_snapshot.size());
333 return true;
334 }
335
337 {
338 // Returns true if the committable idx will require the generation of a
339 // snapshot, and thus a new ledger chunk
340 std::lock_guard<ccf::pal::Mutex> guard(lock);
341
343 idx >= next_snapshot_indices.back().idx,
344 "Committable seqno {} < next snapshot seqno {}",
345 idx,
346 next_snapshot_indices.back().idx);
347
348 bool forced = store->flag_enabled_unsafe(
350
351 ::consensus::Index last_unforced_idx = last_snapshot_idx;
352 for (auto it = next_snapshot_indices.rbegin();
353 it != next_snapshot_indices.rend();
354 it++)
355 {
356 if (!it->forced)
357 {
358 last_unforced_idx = it->idx;
359 break;
360 }
361 }
362
363 auto due = (idx - last_unforced_idx) >= snapshot_tx_interval;
364 if (due || forced)
365 {
366 next_snapshot_indices.push_back({idx, !due, false});
368 "{} {} as snapshot index", !due ? "Forced" : "Recorded", idx);
369 store->unset_flag_unsafe(
371 return due;
372 }
373
374 return false;
375 }
376
379 const std::vector<uint8_t>& sig,
380 const NodeId& node_id,
381 const ccf::crypto::Pem& node_cert)
382 {
383 std::lock_guard<ccf::pal::Mutex> guard(lock);
384
385 for (auto& [_, pending_snapshot] : pending_snapshots)
386 {
387 if (
388 pending_snapshot.evidence_idx.has_value() &&
389 idx > pending_snapshot.evidence_idx.value() &&
390 !pending_snapshot.sig.has_value())
391 {
393 "Recording signature at {} for snapshot {} with evidence at {}",
394 idx,
395 pending_snapshot.version,
396 pending_snapshot.evidence_idx.value());
397
398 pending_snapshot.node_id = node_id;
399 pending_snapshot.node_cert = node_cert;
400 pending_snapshot.sig = sig;
401 }
402 }
403 }
404
406 ::consensus::Index idx, const std::vector<uint8_t>& tree)
407 {
408 std::lock_guard<ccf::pal::Mutex> guard(lock);
409
410 for (auto& [_, pending_snapshot] : pending_snapshots)
411 {
412 if (
413 pending_snapshot.evidence_idx.has_value() &&
414 idx > pending_snapshot.evidence_idx.value() &&
415 !pending_snapshot.tree.has_value())
416 {
418 "Recording serialised tree at {} for snapshot {} with evidence at "
419 "{}",
420 idx,
421 pending_snapshot.version,
422 pending_snapshot.evidence_idx.value());
423
424 pending_snapshot.tree = tree;
425 }
426 }
427 }
428
430 ::consensus::Index idx, const SnapshotHash& snapshot)
431 {
432 std::lock_guard<ccf::pal::Mutex> guard(lock);
433
434 for (auto& [_, pending_snapshot] : pending_snapshots)
435 {
436 if (pending_snapshot.version == snapshot.version)
437 {
439 "Recording evidence idx at {} for snapshot {}",
440 idx,
441 pending_snapshot.version);
442
443 pending_snapshot.evidence_idx = idx;
444 }
445 }
446 }
447
449 {
450 static uint32_t generation_count = 0;
451 auto msg = std::make_unique<::threading::Tmsg<SnapshotMsg>>(&snapshot_cb);
452 msg->data.self = shared_from_this();
453 msg->data.snapshot = store->snapshot_unsafe_maps(idx);
454 msg->data.generation_count = generation_count++;
455
457 tm.add_task(tm.get_execution_thread(generation_count), std::move(msg));
458 }
459
460 void commit(::consensus::Index idx, bool generate_snapshot) override
461 {
462 // If generate_snapshot is true, takes a snapshot of the key value store
463 // at the last snapshottable index before idx, and schedule snapshot
464 // serialisation on another thread (round-robin). Otherwise, only record
465 // that a snapshot was generated.
466
467 ccf::kv::ScopedStoreMapsLock maps_lock(store);
468 std::lock_guard<ccf::pal::Mutex> guard(lock);
469
470 update_indices(idx);
471
472 if (idx < last_snapshot_idx)
473 {
474 throw std::logic_error(fmt::format(
475 "Cannot snapshot at seqno {} which is earlier than last snapshot "
476 "seqno {}",
477 idx,
478 last_snapshot_idx));
479 }
480
482 idx >= next_snapshot_indices.front().idx,
483 "Cannot commit snapshotter at {}, which is before last snapshottable "
484 "idx {}",
485 idx,
486 next_snapshot_indices.front().idx);
487
488 auto& next = next_snapshot_indices.front();
489 auto due = next.idx - last_snapshot_idx >= snapshot_tx_interval;
490 if (due || (next.forced && !next.done))
491 {
492 if (snapshot_generation_enabled && generate_snapshot && next.idx)
493 {
494 schedule_snapshot(next.idx);
495 next.done = true;
496 }
497
498 if (due && !next.forced)
499 {
500 // last_snapshot_idx records the last normally scheduled, i.e.
501 // unforced, snapshot index, so that backups (which don't know forced
502 // indices) continue the snapshot interval normally.
503 last_snapshot_idx = next.idx;
505 "Recorded {} as last snapshot index", last_snapshot_idx);
506 }
507 }
508 }
509
510 void rollback(::consensus::Index idx) override
511 {
512 std::lock_guard<ccf::pal::Mutex> guard(lock);
513
514 while (!next_snapshot_indices.empty() &&
515 (next_snapshot_indices.back().idx > idx))
516 {
517 next_snapshot_indices.pop_back();
518 }
519
520 if (next_snapshot_indices.empty())
521 {
522 next_snapshot_indices.push_back({last_snapshot_idx, false, true});
523 }
524
526 "Rolled back snapshotter: last snapshottable idx is now {}",
527 next_snapshot_indices.front().idx);
528
529 while (!pending_snapshots.empty())
530 {
531 const auto& last_snapshot = std::prev(pending_snapshots.end());
532 if (
533 last_snapshot->second.evidence_idx.has_value() &&
534 idx >= last_snapshot->second.evidence_idx.value())
535 {
536 break;
537 }
538
539 pending_snapshots.erase(last_snapshot);
540 }
541 }
542 };
543}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
Definition claims_digest.h:10
void set(Digest &&digest_)
Definition claims_digest.h:21
const Digest & value() const
Definition claims_digest.h:38
Definition snapshotter.h:24
void init_after_public_recovery()
Definition snapshotter.h:244
void commit(::consensus::Index idx, bool generate_snapshot) override
Definition snapshotter.h:460
void record_snapshot_evidence_idx(::consensus::Index idx, const SnapshotHash &snapshot)
Definition snapshotter.h:429
void schedule_snapshot(::consensus::Index idx)
Definition snapshotter.h:448
Snapshotter(ringbuffer::AbstractWriterFactory &writer_factory_, std::shared_ptr< ccf::kv::Store > &store_, size_t snapshot_tx_interval_)
Definition snapshotter.h:233
bool write_snapshot(std::span< uint8_t > snapshot_buf, uint32_t generation_count)
Definition snapshotter.h:278
void record_signature(::consensus::Index idx, const std::vector< uint8_t > &sig, const NodeId &node_id, const ccf::crypto::Pem &node_cert)
Definition snapshotter.h:377
void rollback(::consensus::Index idx) override
Definition snapshotter.h:510
void record_serialised_tree(::consensus::Index idx, const std::vector< uint8_t > &tree)
Definition snapshotter.h:405
void set_snapshot_generation(bool enabled)
Definition snapshotter.h:254
bool record_committable(::consensus::Index idx) override
Definition snapshotter.h:336
void set_last_snapshot_idx(::consensus::Index idx)
Definition snapshotter.h:260
Definition pem.h:18
Definition sha256_hash.h:16
Definition kv_types.h:573
Definition kv_types.h:760
Definition ring_buffer_types.h:153
virtual WriterPtr create_writer_to_outside()=0
static ThreadMessaging & instance()
Definition thread_messaging.h:278
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
uint64_t Version
Definition version.h:8
@ SUCCESS
Definition kv_types.h:247
std::mutex Mutex
Definition locking.h:17
Definition app_interface.h:15
ServiceValue< SnapshotHash > SnapshotEvidence
Definition snapshot_evidence.h:22
uint64_t Index
Definition ledger_enclave_types.h:11
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:255
Definition snapshot_evidence.h:12
ccf::kv::Version version
Sequence number to which the snapshot corresponds.
Definition snapshot_evidence.h:16
Definition thread_messaging.h:27