CCF
Loading...
Searching...
No Matches
committable_tx.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
6#include "ccf/ds/hex.h"
7#include "ccf/tx.h"
8#include "kv/tx_pimpl.h"
9#include "kv_serialiser.h"
10#include "kv_types.h"
11#include "node/rpc/claims.h"
12
13#include <list>
14
15namespace ccf::kv
16{
18 {
19 public:
20 using TxFlags = uint8_t;
21
28
29 protected:
30 bool committed = false;
31 bool success = false;
32
33 Version version = NoVersion;
34
36
39
40 std::vector<uint8_t> serialise(
41 ccf::crypto::Sha256Hash& commit_evidence_digest,
42 std::string& commit_evidence,
43 const ccf::ClaimsDigest& claims_digest_,
44 bool include_reads = false)
45 {
46 if (!committed)
47 throw std::logic_error("Transaction not yet committed");
48
49 if (!success)
50 throw std::logic_error("Transaction aborted");
51
52 if (claims_digest_.empty())
53 throw std::logic_error("Missing claims");
54
55 // If no transactions made changes, return a zero length vector.
56 const bool any_changes =
57 std::any_of(all_changes.begin(), all_changes.end(), [](const auto& it) {
58 return it.second.changeset->has_writes();
59 });
60
61 if (!any_changes)
62 {
63 return {};
64 }
65
66 auto e = pimpl->store->get_encryptor();
67 if (e == nullptr)
68 {
69 throw KvSerialiserException("No encryptor set");
70 }
71
72 auto commit_nonce = e->get_commit_nonce({pimpl->commit_view, version});
73 commit_evidence = fmt::format(
74 "ce:{}.{}:{}",
75 pimpl->commit_view,
76 version,
77 ccf::ds::to_hex(commit_nonce));
78 LOG_TRACE_FMT("Commit evidence: {}", commit_evidence);
79 ccf::crypto::Sha256Hash tx_commit_evidence_digest(commit_evidence);
80 commit_evidence_digest = tx_commit_evidence_digest;
82
84 {
86 }
87
88 KvStoreSerialiser replicated_serialiser(
89 e,
90 {pimpl->commit_view, version},
91 entry_type,
93 tx_commit_evidence_digest,
94 claims_digest_);
95
96 // Process in security domain order
98 {
99 for (const auto& it : all_changes)
100 {
101 const auto& map = it.second.map;
102 const auto& changeset = it.second.changeset;
103 if (map->get_security_domain() == domain && changeset->has_writes())
104 {
105 map->serialise_changes(
106 changeset.get(), replicated_serialiser, include_reads);
107 }
108 }
109 }
110
111 // Return serialised Tx.
112 return replicated_serialiser.get_raw_data();
113 }
114
115 public:
116 CommittableTx(AbstractStore* _store) : Tx(_store) {}
117
132 const ccf::ClaimsDigest& claims = ccf::empty_claims(),
133 bool track_read_versions = false,
134 std::function<std::tuple<Version, Version>(bool has_new_map)>
135 version_resolver = nullptr,
136 std::function<void(
137 const std::vector<uint8_t>& write_set,
138 const std::string& commit_evidence)> write_set_observer = nullptr)
139 {
140 if (committed)
141 throw std::logic_error("Transaction already committed");
142
143 if (all_changes.empty())
144 {
145 committed = true;
146 success = true;
148 }
149
150 // If this transaction creates any maps, ensure that commit gets a
151 // consistent snapshot of the existing map set
152 const bool maps_created = !pimpl->created_maps.empty();
153 if (maps_created)
154 {
155 this->pimpl->store->lock_map_set();
156 }
157
159
160 std::optional<Version> new_maps_conflict_version = std::nullopt;
161
162 bool track_deletes_on_missing_keys = false;
163 auto c = apply_changes(
165 version_resolver == nullptr ?
166 [&](bool has_new_map) {
167 return pimpl->store->next_version(has_new_map);
168 } :
169 version_resolver,
170 hooks,
171 pimpl->created_maps,
172 new_maps_conflict_version,
173 track_read_versions,
174 track_deletes_on_missing_keys);
175
176 if (maps_created)
177 {
178 this->pimpl->store->unlock_map_set();
179 }
180
181 success = c.has_value();
182
183 if (!success)
184 {
185 // This Tx is now in a dead state. Caller should create a new Tx and try
186 // again.
187 LOG_TRACE_FMT("Could not commit transaction due to conflict");
189 }
190 else
191 {
192 committed = true;
193 version = c.value();
194
196 {
197 pimpl->store->set_flag(
199 // This transaction indicates to the store that the next signature
200 // should trigger a new ledger chunk, but *this* transaction does not
201 // create a new ledger chunk
203 }
204
206 {
207 pimpl->store->set_flag(
210 }
211
212 if (version == NoVersion)
213 {
214 // Read-only transaction
216 }
217
218 // From here, we have received a unique commit version and made
219 // modifications to our local kv. If we fail in any way, we cannot
220 // recover.
221 try
222 {
223 ccf::crypto::Sha256Hash commit_evidence_digest;
224 std::string commit_evidence;
225 auto data =
226 serialise(commit_evidence_digest, commit_evidence, claims);
227
228 if (data.empty())
229 {
231 }
232
233 if (write_set_observer != nullptr)
234 {
235 write_set_observer(data, commit_evidence);
236 }
237
238 auto claims_ = claims;
239
240 return pimpl->store->commit(
241 {pimpl->commit_view, version},
242 std::make_unique<MovePendingTx>(
243 std::move(data),
244 std::move(claims_),
245 std::move(commit_evidence_digest),
246 std::move(hooks)),
247 false);
248 }
249 catch (const std::exception& e)
250 {
251 committed = false;
252
253 LOG_FAIL_FMT("Error during serialisation");
254 LOG_DEBUG_FMT("Error during serialisation: {}", e.what());
255
256 // Discard original exception type, throw as now fatal
257 // KvSerialiserException
258 throw KvSerialiserException(e.what());
259 }
260 }
261 }
262
271 {
272 if (!committed)
273 throw std::logic_error("Transaction not yet committed");
274
275 if (!success)
276 throw std::logic_error("Transaction aborted");
277
278 return version;
279 }
280
289 {
290 if (!committed)
291 throw std::logic_error("Transaction not yet committed");
292
293 if (!success)
294 throw std::logic_error("Transaction aborted");
295
296 return pimpl->commit_view;
297 }
298
304 {
305 return version;
306 }
307
308 std::optional<TxID> get_txid()
309 {
310 if (!committed)
311 {
312 throw std::logic_error("Transaction not yet committed");
313 }
314
315 if (!pimpl->read_txid.has_value())
316 {
317 // Transaction did not get a handle on any map.
318 return std::nullopt;
319 }
320
321 // A committed tx is read-only (i.e. no write to any map) if it was not
322 // assigned a version when it was committed
323 if (version == NoVersion)
324 {
325 // Read-only transaction
326 return pimpl->read_txid.value();
327 }
328 else
329 {
330 // Write transaction
331 return TxID(pimpl->commit_view, version);
332 }
333 }
334
335 void set_change_list(OrderedChanges&& change_list_, Term term_) override
336 {
337 // if all_changes is not empty then any coinciding keys will not be
338 // overwritten
339 all_changes.merge(change_list_);
340 pimpl->commit_view = term_;
341 }
342
343 void set_view(ccf::View view_)
344 {
345 pimpl->commit_view = view_;
346 }
347
349 {
350 req_id = req_id_;
351 }
352
354 {
355 return req_id;
356 }
357
358 void set_read_txid(const TxID& tx_id, Term commit_view_)
359 {
360 if (pimpl->read_txid.has_value())
361 {
362 throw std::logic_error("Read TxID already set");
363 }
364 pimpl->read_txid = tx_id;
365 pimpl->commit_view = commit_view_;
366 }
367
372
373 virtual void set_flag(Flag flag)
374 {
375 flags |= static_cast<uint8_t>(flag);
376 }
377
378 virtual void unset_flag(Flag flag)
379 {
380 flags &= ~static_cast<uint8_t>(flag);
381 }
382
383 virtual bool flag_enabled(Flag f) const
384 {
385 return (flags & static_cast<uint8_t>(f)) != 0;
386 }
387 };
388
389 // Used by frontend for reserved transactions. These are constructed with a
390 // pre-reserved Version, and _must succeed_ to fulfil this version. Otherwise
391 // they create a hole in the transaction order, and no future transactions can
392 // complete. These transactions are used internally by CCF for the sole
393 // purpose of recording node signatures and are safe in this particular
394 // situation because they never perform any reads and therefore can
395 // never conflict.
397 {
398 private:
399 Version rollback_count = 0;
400
401 public:
403 AbstractStore* _store,
404 Term read_term,
405 const TxID& reserved_tx_id,
406 Version rollback_count_) :
407 CommittableTx(_store)
408 {
409 version = reserved_tx_id.version;
410 pimpl->commit_view = reserved_tx_id.term;
411 pimpl->read_txid = TxID(read_term, reserved_tx_id.version - 1);
412 rollback_count = rollback_count_;
413 }
414
415 // Used by frontend to commit reserved transactions
417 {
418 if (committed)
419 throw std::logic_error("Transaction already committed");
420
421 if (all_changes.empty())
422 throw std::logic_error("Reserved transaction cannot be empty");
423
424 std::vector<ConsensusHookPtr> hooks;
425 bool track_read_versions = false;
426 bool track_deletes_on_missing_keys = false;
427 auto c = apply_changes(
429 [this](bool) { return std::make_tuple(version, version - 1); },
430 hooks,
431 pimpl->created_maps,
432 version,
433 track_read_versions,
434 track_deletes_on_missing_keys,
435 rollback_count);
436 success = c.has_value();
437
438 if (!success)
439 throw std::logic_error("Failed to commit reserved transaction");
440
441 ccf::crypto::Sha256Hash commit_evidence_digest;
442 std::string commit_evidence;
443
444 // This is a signature and, if the ledger chunking or snapshot flags are
445 // enabled, we want the host to create a chunk when it sees this entry.
446 // version_lock held by Store::commit
447 if (pimpl->store->must_force_ledger_chunk_unsafe(version))
448 {
451 "Forcing ledger chunk for signature at {}.{}",
452 pimpl->commit_view,
453 version);
454 }
455
456 committed = true;
457 auto claims = ccf::empty_claims();
458 auto data = serialise(commit_evidence_digest, commit_evidence, claims);
459
460 // Reset ledger chunk flag in the store
461 pimpl->store->unset_flag_unsafe(
463
464 return {
466 std::move(data),
467 ccf::empty_claims(),
468 std::move(commit_evidence_digest),
469 std::move(hooks)};
470 }
471 };
472}
Definition claims_digest.h:10
bool empty() const
Definition claims_digest.h:33
Definition sha256_hash.h:16
Definition kv_types.h:677
OrderedChanges all_changes
Definition tx.h:52
std::optional< ccf::crypto::Sha256Hash > root_at_read_version
Definition tx.h:54
std::unique_ptr< PrivateImpl > pimpl
Definition tx.h:50
Definition committable_tx.h:18
SerialisedEntryFlags entry_flags
Definition committable_tx.h:38
Version get_version()
Definition committable_tx.h:303
std::vector< uint8_t > serialise(ccf::crypto::Sha256Hash &commit_evidence_digest, std::string &commit_evidence, const ccf::ClaimsDigest &claims_digest_, bool include_reads=false)
Definition committable_tx.h:40
bool success
Definition committable_tx.h:31
std::optional< TxID > get_txid()
Definition committable_tx.h:308
Flag
Definition committable_tx.h:23
bool committed
Definition committable_tx.h:30
Version version
Definition committable_tx.h:33
CommitResult commit(const ccf::ClaimsDigest &claims=ccf::empty_claims(), bool track_read_versions=false, std::function< std::tuple< Version, Version >(bool has_new_map)> version_resolver=nullptr, std::function< void(const std::vector< uint8_t > &write_set, const std::string &commit_evidence)> write_set_observer=nullptr)
Definition committable_tx.h:131
void set_read_txid(const TxID &tx_id, Term commit_view_)
Definition committable_tx.h:358
virtual void unset_flag(Flag flag)
Definition committable_tx.h:378
const ccf::kv::TxHistory::RequestID & get_req_id()
Definition committable_tx.h:353
uint8_t TxFlags
Definition committable_tx.h:20
virtual void set_flag(Flag flag)
Definition committable_tx.h:373
void set_root_at_read_version(const ccf::crypto::Sha256Hash &r)
Definition committable_tx.h:368
void set_view(ccf::View view_)
Definition committable_tx.h:343
CommittableTx(AbstractStore *_store)
Definition committable_tx.h:116
TxFlags flags
Definition committable_tx.h:37
ccf::kv::TxHistory::RequestID req_id
Definition committable_tx.h:35
Version commit_version()
Definition committable_tx.h:270
virtual bool flag_enabled(Flag f) const
Definition committable_tx.h:383
void set_change_list(OrderedChanges &&change_list_, Term term_) override
Definition committable_tx.h:335
Version commit_term()
Definition committable_tx.h:288
void set_req_id(const ccf::kv::TxHistory::RequestID &req_id_)
Definition committable_tx.h:348
Definition generic_serialise_wrapper.h:20
Definition kv_types.h:352
Definition committable_tx.h:397
ReservedTx(AbstractStore *_store, Term read_term, const TxID &reserved_tx_id, Version rollback_count_)
Definition committable_tx.h:402
PendingTxInfo commit_reserved()
Definition committable_tx.h:416
std::tuple< size_t, size_t > RequestID
Definition kv_types.h:370
Definition tx.h:202
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
Definition app_interface.h:20
uint64_t Term
Definition kv_types.h:46
@ PRIVATE
Definition kv_types.h:255
@ PUBLIC
Definition kv_types.h:254
@ WriteSetWithCommitEvidenceAndClaims
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:246
@ SUCCESS
Definition kv_types.h:247
@ FAIL_CONFLICT
Definition kv_types.h:248
uint8_t SerialisedEntryFlags
Definition serialised_entry_format.h:12
@ FORCE_LEDGER_CHUNK_BEFORE
Definition serialised_entry_format.h:17
@ FORCE_LEDGER_CHUNK_AFTER
Definition serialised_entry_format.h:16
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:42
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
uint64_t View
Definition tx_id.h:23
Definition map_serializers.h:11
Definition apply_changes.h:19
Definition kv_types.h:481
Definition kv_types.h:50
Version version
Definition kv_types.h:52
Term term
Definition kv_types.h:51