CCF
Loading...
Searching...
No Matches
committable_tx.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
6#include "ccf/ds/hex.h"
7#include "ccf/tx.h"
9#include "kv/tx_pimpl.h"
10#include "kv_serialiser.h"
11#include "kv_types.h"
12#include "node/rpc/claims.h"
13
14#include <list>
15
16namespace ccf::kv
17{
18 class CommittableTx : public Tx
19 {
20 public:
21 using TxFlags = uint8_t;
22
29
30 protected:
31 bool committed = false;
32 bool success = false;
33
34 Version version = NoVersion;
35
38
39 std::vector<uint8_t> serialise(
40 ccf::crypto::Sha256Hash& commit_evidence_digest,
41 std::string& commit_evidence,
42 const ccf::ClaimsDigest& claims_digest_,
43 bool include_reads = false)
44 {
45 if (!committed)
46 {
47 throw std::logic_error("Transaction not yet committed");
48 }
49
50 if (!success)
51 {
52 throw std::logic_error("Transaction aborted");
53 }
54
55 if (claims_digest_.empty())
56 {
57 throw std::logic_error("Missing claims");
58 }
59
60 // If no transactions made changes, return a zero length vector.
61 const bool any_changes =
62 std::any_of(all_changes.begin(), all_changes.end(), [](const auto& it) {
63 return it.second.changeset->has_writes();
64 });
65
66 if (!any_changes)
67 {
68 return {};
69 }
70
71 auto e = pimpl->store->get_encryptor();
72 if (e == nullptr)
73 {
74 throw KvSerialiserException("No encryptor set");
75 }
76
77 commit_evidence = e->get_commit_evidence({pimpl->commit_view, version});
78 LOG_TRACE_FMT("Commit evidence: {}", commit_evidence);
79 ccf::crypto::Sha256Hash tx_commit_evidence_digest(commit_evidence);
80 commit_evidence_digest = tx_commit_evidence_digest;
82
84 {
86 }
87
88 KvStoreSerialiser replicated_serialiser(
89 e,
90 {pimpl->commit_view, version},
91 entry_type,
93 tx_commit_evidence_digest,
94 claims_digest_);
95
96 // Process in security domain order
98 {
99 for (const auto& it : all_changes)
100 {
101 const auto& map = it.second.map;
102 const auto& changeset = it.second.changeset;
103 if (map->get_security_domain() == domain && changeset->has_writes())
104 {
105 map->serialise_changes(
106 changeset.get(), replicated_serialiser, include_reads);
107 }
108 }
109 }
110
111 // Return serialised Tx.
112 return replicated_serialiser.get_raw_data();
113 }
114
115 public:
116 CommittableTx(AbstractStore* _store) : Tx(_store) {}
117
118 using WriteSetObserver = std::function<void(
119 const ccf::crypto::Sha256Hash& write_set_digest,
120 const std::string& commit_evidence)>;
121
136 const ccf::ClaimsDigest& claims = ccf::empty_claims(),
137 std::function<std::tuple<Version, Version>(bool has_new_map)>
138 version_resolver = nullptr,
139 WriteSetObserver write_set_observer = nullptr)
140 {
141 if (committed)
142 {
143 throw std::logic_error("Transaction already committed");
144 }
145
146 if (all_changes.empty())
147 {
148 committed = true;
149 success = true;
151 }
152
153 // If this transaction creates any maps, ensure that commit gets a
154 // consistent snapshot of the existing map set
155 const bool maps_created = !pimpl->created_maps.empty();
156 if (maps_created)
157 {
158 this->pimpl->store->lock_map_set();
159 }
160
162
163 std::optional<Version> new_maps_conflict_version = std::nullopt;
164
165 bool track_deletes_on_missing_keys = false;
166 auto c = apply_changes(
168 version_resolver == nullptr ?
169 [&](bool has_new_map) {
170 return pimpl->store->next_version(has_new_map);
171 } :
172 version_resolver,
173 hooks,
174 pimpl->created_maps,
175 new_maps_conflict_version,
176 track_deletes_on_missing_keys);
177
178 if (maps_created)
179 {
180 this->pimpl->store->unlock_map_set();
181 }
182
183 success = c.has_value();
184
185 if (!success)
186 {
187 // This Tx is now in a dead state. Caller should create a new Tx and try
188 // again.
189 LOG_TRACE_FMT("Could not commit transaction due to conflict");
191 }
192
193 committed = true;
194 version = c.value();
195
197 {
198 auto chunker = pimpl->store->get_chunker();
199 if (chunker)
200 {
201 chunker->force_end_of_chunk(version);
202 }
203 }
204
206 {
207 pimpl->store->set_flag(
210 }
211
212 if (version == NoVersion)
213 {
214 // Read-only transaction
216 }
217
218 // From here, we have received a unique commit version and made
219 // modifications to our local kv. If we fail in any way, we cannot
220 // recover.
221 try
222 {
223 ccf::crypto::Sha256Hash commit_evidence_digest;
224 std::string commit_evidence;
225 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
226
227 if (data.empty())
228 {
230 }
231
232 if (write_set_observer != nullptr)
233 {
234 ccf::crypto::Sha256Hash ws_digest({data.data(), data.size()});
235 write_set_observer(ws_digest, commit_evidence);
236 }
237
238 auto claims_ = claims;
239
240 return pimpl->store->commit(
241 {pimpl->commit_view, version},
242 std::make_unique<MovePendingTx>(
243 std::move(data),
244 std::move(claims_),
245 std::move(commit_evidence_digest),
246 std::move(hooks)),
247 false);
248 }
249 catch (const std::exception& e)
250 {
251 committed = false;
252
253 LOG_FAIL_FMT("Error during serialisation");
254 LOG_DEBUG_FMT("Error during serialisation: {}", e.what());
255
256 // Discard original exception type, throw as now fatal
257 // KvSerialiserException
258 throw KvSerialiserException(e.what());
259 }
260 }
261
269 [[nodiscard]] Version commit_version() const
270 {
271 if (!committed)
272 {
273 throw std::logic_error("Transaction not yet committed");
274 }
275
276 if (!success)
277 {
278 throw std::logic_error("Transaction aborted");
279 }
280
281 return version;
282 }
283
291 [[nodiscard]] Version commit_term() const
292 {
293 if (!committed)
294 {
295 throw std::logic_error("Transaction not yet committed");
296 }
297
298 if (!success)
299 {
300 throw std::logic_error("Transaction aborted");
301 }
302
303 return pimpl->commit_view;
304 }
305
306 [[nodiscard]] std::optional<TxID> get_txid() const
307 {
308 if (!committed)
309 {
310 throw std::logic_error("Transaction not yet committed");
311 }
312
313 if (!pimpl->read_txid.has_value())
314 {
315 // Transaction did not get a handle on any map.
316 return std::nullopt;
317 }
318
319 // A committed tx is read-only (i.e. no write to any map) if it was not
320 // assigned a version when it was committed
321 if (version == NoVersion)
322 {
323 // Read-only transaction
324 return pimpl->read_txid;
325 }
326
327 // Write transaction
328 return TxID(pimpl->commit_view, version);
329 }
330
331 void set_read_txid(const TxID& tx_id, Term commit_view_)
332 {
333 if (pimpl->read_txid.has_value())
334 {
335 throw std::logic_error("Read TxID already set");
336 }
337 pimpl->read_txid = tx_id;
338 pimpl->commit_view = commit_view_;
339 }
340
345
346 virtual void set_tx_flag(TxFlag flag)
347 {
348 flags |= static_cast<TxFlags>(flag);
349 }
350
351 virtual void unset_tx_flag(TxFlag flag)
352 {
353 flags &= ~static_cast<TxFlags>(flag);
354 }
355
356 [[nodiscard]] virtual bool tx_flag_enabled(TxFlag f) const
357 {
358 return (flags & static_cast<TxFlags>(f)) != 0;
359 }
360 };
361
362 // Used by frontend for reserved transactions. These are constructed with a
363 // pre-reserved Version, and _must succeed_ to fulfil this version. Otherwise
364 // they create a hole in the transaction order, and no future transactions can
365 // complete. These transactions are used internally by CCF for the sole
366 // purpose of recording node signatures and are safe in this particular
367 // situation because they never perform any reads and therefore can
368 // never conflict.
370 {
371 private:
372 Version rollback_count = 0;
373
374 public:
376 AbstractStore* _store,
377 Term read_term,
378 const TxID& reserved_tx_id,
379 Version rollback_count_) :
380 CommittableTx(_store),
381 rollback_count(rollback_count_)
382 {
383 version = reserved_tx_id.seqno;
384 pimpl->commit_view = reserved_tx_id.view;
385 pimpl->read_txid = TxID(read_term, reserved_tx_id.seqno - 1);
386 }
387
388 // Used by frontend to commit reserved transactions
390 {
391 if (committed)
392 {
393 throw std::logic_error("Transaction already committed");
394 }
395
396 if (all_changes.empty())
397 {
398 throw std::logic_error("Reserved transaction cannot be empty");
399 }
400
401 std::vector<ConsensusHookPtr> hooks;
402 bool track_deletes_on_missing_keys = false;
403 auto c = apply_changes(
405 [this](bool) { return std::make_tuple(version, version - 1); },
406 hooks,
407 pimpl->created_maps,
408 version,
409 track_deletes_on_missing_keys,
410 rollback_count);
411 success = c.has_value();
412
413 if (!success)
414 {
415 if (pimpl->store->check_rollback_count(rollback_count))
416 {
417 throw std::logic_error("Failed to commit reserved transaction");
418 }
419
420 committed = true;
421 return {
422 CommitResult::FAIL_NO_REPLICATE, {}, ccf::empty_claims(), {}, {}};
423 }
424
425 ccf::crypto::Sha256Hash commit_evidence_digest;
426 std::string commit_evidence;
427
428 // This is a signature and, if the ledger chunking or snapshot flags are
429 // enabled, we want the host to create a chunk when it sees this entry.
430 // version_lock held by Store::commit
431 if (pimpl->store->should_create_ledger_chunk_unsafe(version))
432 {
435 "Ending ledger chunk with signature at {}.{}",
436 pimpl->commit_view,
437 version);
438
439 auto chunker = pimpl->store->get_chunker();
440 if (chunker)
441 {
442 chunker->produced_chunk_at(version);
443 }
444 }
445
446 committed = true;
447 auto claims = ccf::empty_claims();
448 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
449
450 return {
452 std::move(data),
453 ccf::empty_claims(),
454 std::move(commit_evidence_digest),
455 std::move(hooks)};
456 }
457 };
458}
Definition claims_digest.h:10
bool empty() const
Definition claims_digest.h:33
Definition sha256_hash.h:16
Definition kv_types.h:632
OrderedChanges all_changes
Definition tx.h:51
std::optional< ccf::crypto::Sha256Hash > root_at_read_version
Definition tx.h:53
std::unique_ptr< PrivateImpl > pimpl
Definition tx.h:49
Definition committable_tx.h:19
SerialisedEntryFlags entry_flags
Definition committable_tx.h:37
std::vector< uint8_t > serialise(ccf::crypto::Sha256Hash &commit_evidence_digest, std::string &commit_evidence, const ccf::ClaimsDigest &claims_digest_, bool include_reads=false)
Definition committable_tx.h:39
bool success
Definition committable_tx.h:32
bool committed
Definition committable_tx.h:31
Version version
Definition committable_tx.h:34
virtual void unset_tx_flag(TxFlag flag)
Definition committable_tx.h:351
Version commit_term() const
Definition committable_tx.h:291
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:331
TxFlag
Definition committable_tx.h:24
virtual bool tx_flag_enabled(TxFlag f) const
Definition committable_tx.h:356
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, WriteSetObserver write_set_observer=nullptr)
Definition committable_tx.h:135
Version commit_version() const
Definition committable_tx.h:269
uint8_t TxFlags
Definition committable_tx.h:21
std::function< void(const ccf::crypto::Sha256Hash &write_set_digest, const std::string &commit_evidence)> WriteSetObserver
Definition committable_tx.h:120
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:341
CommittableTx(AbstractStore *_store)
Definition committable_tx.h:116
TxFlags flags
Definition committable_tx.h:36
std::optional< TxID > get_txid() const
Definition committable_tx.h:306
virtual void set_tx_flag(TxFlag flag)
Definition committable_tx.h:346
Definition generic_serialise_wrapper.h:21
Definition kv_types.h:318
Definition committable_tx.h:370
ReservedTx(AbstractStore *_store, Term read_term, const TxID &reserved_tx_id, Version rollback_count_)
Definition committable_tx.h:375
PendingTxInfo commit_reserved()
Definition committable_tx.h:389
Definition tx.h:200
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:18
uint64_t Term
Definition kv_types.h:46
@ WriteSetWithCommitEvidenceAndClaims
CommitResult
Definition kv_types.h:212
@ FAIL_NO_REPLICATE
Definition kv_types.h:215
@ SUCCESS
Definition kv_types.h:213
@ FAIL_CONFLICT
Definition kv_types.h:214
@ PRIVATE
Definition kv_types.h:221
@ PUBLIC
Definition kv_types.h:220
uint64_t Version
Definition version.h:10
uint8_t SerialisedEntryFlags
Definition serialised_entry_format.h:12
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
Definition map_serializers.h:11
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
Definition kv_types.h:430