16#define FMT_HEADER_ONLY
18#include <fmt/format.h>
31 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
59 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>,
bool>>
65 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
89 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
90 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
94 std::shared_ptr<Consensus>
consensus =
nullptr;
95 std::shared_ptr<TxHistory> history =
nullptr;
104 const bool strict_versions =
true;
107 const bool is_historical =
false;
112 bool commit_deserialised(
118 bool track_deletes_on_missing_keys)
override
120 auto c = apply_changes(
122 [v](
bool) {
return std::make_tuple(v, v - 1); },
127 track_deletes_on_missing_keys);
130 LOG_FAIL_FMT(
"Failed to commit deserialised Tx at version {}", v);
142 bool has_map_internal(
const std::string& name)
144 auto search =
maps.find(name);
145 if (search !=
maps.end())
158 if (
version > std::numeric_limits<int64_t>::max())
170 TxID current_txid_unsafe()
177 Store(
bool strict_versions_ =
true,
bool is_historical_ =
false) :
178 strict_versions(strict_versions_),
179 is_historical(is_historical_)
194 std::atomic_store(&
consensus, consensus_);
209 encryptor = encryptor_;
219 snapshotter = snapshotter_;
238 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
251 auto search =
maps.find(map_name);
252 if (search !=
maps.end())
254 const auto& [map_creation_version, map_ptr] = search->second;
255 if (v >= map_creation_version || map_creation_version == NoVersion)
277 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
280 throw std::logic_error(fmt::format(
281 "Can't add dynamic map - {} is not of expected type",
285 const auto map_name =
map->get_name();
288 throw std::logic_error(fmt::format(
289 "Can't add dynamic map - already have a map named {}", map_name));
292 LOG_DEBUG_FMT(
"Adding newly created map '{}' at version {}", map_name, v);
293 maps[map_name] = std::make_pair(v,
map);
297 const auto global_it = global_hooks.find(map_name);
298 if (global_it != global_hooks.end())
300 map->set_global_hook(global_it->second);
303 const auto map_it = map_hooks.find(map_name);
304 if (map_it != map_hooks.end())
306 map->set_map_hook(map_it->second);
316 throw std::logic_error(fmt::format(
317 "Cannot snapshot at version {} which is earlier than last "
318 "compacted version {} ",
325 throw std::logic_error(fmt::format(
326 "Cannot snapshot at version {} which is later than current "
332 auto snapshot = std::make_unique<StoreSnapshot>(v);
335 for (
auto& it :
maps)
337 auto& [_,
map] = it.second;
338 snapshot->add_map_snapshot(
map->snapshot(v));
344 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
350 snapshot->add_view_history(c->get_view_history(v));
360 for (
auto& it :
maps)
362 auto& [_,
map] = it.second;
369 for (
auto& it :
maps)
371 auto& [_,
map] = it.second;
378 std::unique_ptr<AbstractSnapshot> snapshot)
override
381 return snapshot->serialise(e);
388 std::vector<Version>* view_history =
nullptr,
389 bool public_only =
false)
override
395 std::optional<ccf::kv::SecurityDomain>());
398 auto v_ = d.init(data, size, term, is_historical);
401 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
405 std::shared_ptr<TxHistory> h =
nullptr;
406 std::vector<uint8_t> hash_at_snapshot;
407 std::vector<Version> view_history_;
409 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
411 for (
auto& it :
maps)
413 auto& [_,
map] = it.second;
420 hash_at_snapshot = d.deserialise_raw();
425 view_history_ = d.deserialise_view_history();
431 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
433 const auto map_name = r.value();
435 std::shared_ptr<ccf::kv::untyped::Map>
map =
nullptr;
437 auto search =
maps.find(map_name);
438 if (search ==
maps.end())
440 map = std::make_shared<ccf::kv::untyped::Map>(
441 this, map_name, get_security_domain(map_name));
442 new_maps[map_name] =
map;
444 "Creating map {} while deserialising snapshot at version {}",
450 map = search->second.second;
453 auto changes_search = changes.find(map_name);
454 if (changes_search != changes.end())
456 LOG_FAIL_FMT(
"Failed to deserialise snapshot at version {}", v);
461 auto deserialised_snapshot_changes =
462 map->deserialise_snapshot_changes(d);
466 changes.emplace_hint(
468 std::piecewise_construct,
469 std::forward_as_tuple(map_name),
470 std::forward_as_tuple(
471 map, std::move(deserialised_snapshot_changes)));
474 for (
auto& it :
maps)
476 auto& [_,
map] = it.second;
482 LOG_FAIL_FMT(
"Unexpected content in snapshot at version {}", v);
489 bool track_deletes_on_missing_keys =
false;
490 auto r = apply_changes(
492 [](
bool) {
return std::make_tuple(NoVersion, NoVersion); },
497 track_deletes_on_missing_keys);
501 "Failed to commit deserialised snapshot at version {}", v);
514 if (!h->init_from_snapshot(hash_at_snapshot))
522 *view_history = std::move(view_history_);
537 bool generate_snapshot = c && c->is_primary();
538 snapshotter->commit(v, generate_snapshot);
541 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
548 for (
auto& it :
maps)
550 auto& [_,
map] = it.second;
554 for (
auto& it :
maps)
556 auto& [_,
map] = it.second;
560 for (
auto& it :
maps)
562 auto& [_,
map] = it.second;
577 for (
auto& it :
maps)
579 auto& [_,
map] = it.second;
592 snapshotter->rollback(tx_id.
version);
595 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
601 throw std::logic_error(fmt::format(
602 "Attempting rollback to {}, earlier than commit version {}",
638 for (
auto& it :
maps)
640 auto& [_,
map] = it.second;
644 auto it =
maps.begin();
645 while (it !=
maps.end())
647 auto& [map_creation_version,
map] = it->second;
651 if (map_creation_version > tx_id.
version)
664 for (
auto& map_it :
maps)
666 auto& [_,
map] = map_it.second;
678 throw std::logic_error(
"term_of_next_version is already initialised");
690 const std::vector<uint8_t>& data,
697 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
698 bool ignore_strict_versions =
false)
override
710 std::optional<ccf::kv::SecurityDomain>());
712 auto v_ = d.init(data.data(), data.size(),
view, is_historical);
715 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
720 claims_digest = std::move(d.consume_claims_digest());
722 "Deserialised claim digest {} {}",
723 claims_digest.
value(),
724 claims_digest.
empty());
726 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
727 if (commit_evidence_digest.has_value())
729 "Deserialised commit evidence digest {}",
730 commit_evidence_digest.value());
736 if (strict_versions && !ignore_strict_versions)
743 "Tried to deserialise {} but current_version is {}", v, cv);
752 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
754 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
756 const auto map_name = r.value();
761 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
762 this, map_name, get_security_domain(map_name));
764 new_maps[map_name] = new_map;
766 "Creating map '{}' while deserialising transaction at version {}",
771 auto change_search = changes.find(map_name);
772 if (change_search != changes.end())
774 LOG_FAIL_FMT(
"Failed to deserialise transaction at version {}", v);
779 auto deserialised_changes =
map->deserialise_changes(d, v);
783 changes.emplace_hint(
785 std::piecewise_construct,
786 std::forward_as_tuple(map_name),
787 std::forward_as_tuple(
map, std::move(deserialised_changes)));
792 LOG_FAIL_FMT(
"Unexpected content in transaction at version {}", v);
800 const std::vector<uint8_t>& data,
801 bool public_only =
false,
802 const std::optional<TxID>& expected_txid = std::nullopt)
override
804 auto exec = std::make_unique<CFTExecutionWrapper>(
805 this,
get_history(), std::move(data), public_only, expected_txid);
815 if (
maps.size() != that.
maps.size())
818 for (
auto it =
maps.begin(); it !=
maps.end(); ++it)
820 auto search = that.
maps.find(it->first);
822 if (search == that.
maps.end())
825 auto& [this_v, this_map] = it->second;
826 auto& [that_v, that_map] = search->second;
828 if (this_v != that_v)
831 if (*this_map != *that_map)
847 return current_txid_unsafe();
853 return {kv_id.term, kv_id.version};
876 std::unique_ptr<PendingTx> pending_tx,
877 bool globally_committable)
override
885 std::lock_guard<ccf::pal::Mutex> cguard(
commit_lock);
888 "Store::commit {}{}",
890 (globally_committable ?
" globally_committable" :
""));
893 Version previous_last_replicated = 0;
894 Version next_last_replicated = 0;
895 Version previous_rollback_count = 0;
898 std::vector<std::tuple<std::unique_ptr<PendingTx>,
bool>>
899 contiguous_pending_txs;
909 "Want to commit for term {} but term is {}",
923 std::make_tuple(std::move(pending_tx), globally_committable)});
927 for (
Version offset = 1;
true; ++offset)
933 "Couldn't find {} = {} + {}, giving up on batch while committing "
943 contiguous_pending_txs.emplace_back(std::move(search->second));
949 next_last_replicated =
last_replicated + contiguous_pending_txs.size();
955 if (contiguous_pending_txs.size() == 0)
961 for (
auto& [pending_tx_, committable_] : contiguous_pending_txs)
964 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
967 std::make_shared<std::vector<uint8_t>>(std::move(data_));
969 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
984 *data_shared, commit_evidence_digest_, claims_digest_),
989 "Batching {} ({}) during commit of {}.{}",
1000 if (c->replicate(batch, replication_view))
1035 r |= snapshotter->record_committable(
version);
1060 Version v = next_version_unsafe();
1068 return std::make_tuple(v, previous_last_new_map);
1074 return next_version_unsafe();
1080 next_version_unsafe();
1108 if (source_version > target_version)
1110 throw std::runtime_error(fmt::format(
1111 "Invalid call to swap_private_maps. Source is at version {} while "
1118 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1122 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1123 std::vector<MapEntry> entries;
1126 for (
auto& [name, pair] : store.
maps)
1128 auto& [_,
map] = pair;
1132 entries.emplace_back(name,
nullptr,
map.get());
1138 auto entry = entries.begin();
1139 while (entry != entries.end())
1141 const auto& [name, _, their_map] = *entry;
1142 std::shared_ptr<AbstractMap>
map =
nullptr;
1143 const auto it =
maps.find(name);
1144 if (it ==
maps.end())
1149 auto new_map = std::make_pair(
1151 std::make_shared<ccf::kv::untyped::Map>(
1153 maps[name] = new_map;
1154 map = new_map.second;
1158 map = it->second.second;
1161 throw std::logic_error(fmt::format(
1162 "Swap mismatch - map {} is private in source but not in target",
1167 std::get<1>(*entry) =
map.get();
1172 for (
auto& [name, lhs, rhs] : entries)
1177 for (
auto& [name, lhs, rhs] : entries)
1187 map_hooks[map_name] = hook;
1189 const auto it =
maps.find(map_name);
1190 if (it !=
maps.end())
1192 it->second.second->set_map_hook(hook);
1198 map_hooks.erase(map_name);
1200 const auto it =
maps.find(map_name);
1201 if (it !=
maps.end())
1203 it->second.second->unset_map_hook();
1208 const std::string& map_name,
1211 global_hooks[map_name] = hook;
1213 const auto it =
maps.find(map_name);
1214 if (it !=
maps.end())
1216 it->second.second->set_global_hook(hook);
1222 global_hooks.erase(map_name);
1224 const auto it =
maps.find(map_name);
1225 if (it !=
maps.end())
1227 it->second.second->unset_global_hook();
1238 return std::make_unique<ReadOnlyTx>(
this);
1253 return std::make_unique<CommittableTx>(
this);
1282 this->flags |=
static_cast<uint8_t
>(f);
1287 this->flags &= ~static_cast<uint8_t>(f);
1292 return (flags &
static_cast<uint8_t
>(f)) != 0;
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:677
Flag
Definition kv_types.h:745
@ LEDGER_CHUNK_AT_NEXT_SIGNATURE
@ SNAPSHOT_AT_NEXT_SIGNATURE
Definition committable_tx.h:18
Definition deserialise.h:17
Definition read_only_store.h:13
Definition committable_tx.h:397
Term term_of_last_version
Definition store.h:50
std::atomic< Version > compacted
Definition store.h:38
std::atomic< Version > version
Definition store.h:36
ccf::pal::Mutex maps_lock
Definition store.h:32
Version last_committable
Definition store.h:55
Version rollback_count
Definition store.h:57
Version last_new_map
Definition store.h:37
Maps maps
Definition store.h:33
std::atomic< Term > term_of_next_version
Definition store.h:44
void clear()
Definition store.h:63
ccf::pal::Mutex version_lock
Definition store.h:35
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:31
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:60
Version last_replicated
Definition store.h:52
ccf::pal::Mutex commit_lock
Definition store.h:41
virtual bool flag_enabled(Flag f) override
Definition store.h:1274
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:384
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1256
ReadOnlyTx create_read_only_tx() override
Definition store.h:1231
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:242
size_t committable_gap() override
Definition store.h:1085
TxID next_txid() override
Definition store.h:1077
void unset_global_hook(const std::string &map_name)
Definition store.h:1220
bool must_force_ledger_chunk_unsafe(Version version) override
Definition store.h:1024
void initialise_term(Term t) override
Definition store.h:671
Store(const Store &that)=delete
std::shared_ptr< TxHistory > get_history() override
Definition store.h:197
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1184
void unlock_map_set() override
Definition store.h:1046
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:217
void unset_map_hook(const std::string &map_name)
Definition store.h:1196
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:274
Version current_version() override
Definition store.h:838
void swap_private_maps(Store &store)
Definition store.h:1103
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:235
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1057
bool operator==(const Store &that) const
Definition store.h:809
EncryptorPtr get_encryptor() override
Definition store.h:212
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:377
bool check_rollback_count(Version count) override
Definition store.h:1051
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1251
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:192
ccf::kv::TxID current_txid() override
Definition store.h:843
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:584
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:311
virtual bool flag_enabled_unsafe(Flag f) const override
Definition store.h:1290
void lock_map_set() override
Definition store.h:1041
Term commit_view() override
Definition store.h:868
void unlock_maps() override
Definition store.h:367
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1236
void lock_maps() override
Definition store.h:357
virtual void set_flag(Flag f) override
Definition store.h:1262
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:207
bool must_force_ledger_chunk(Version version) override
Definition store.h:1018
CommittableTx create_tx()
Definition store.h:1246
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:689
Version next_version() override
Definition store.h:1071
Version compacted_version() override
Definition store.h:863
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:177
virtual void unset_flag(Flag f) override
Definition store.h:1268
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1207
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:856
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:184
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:799
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:202
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:874
virtual void unset_flag_unsafe(Flag f) override
Definition store.h:1285
virtual void set_flag_unsafe(Flag f) override
Definition store.h:1280
ccf::TxID get_txid() override
Definition store.h:850
void compact(Version v) override
Definition store.h:528
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:248
TxDiff create_tx_diff() override
Definition store.h:1241
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
Definition app_interface.h:20
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:570
uint64_t Term
Definition kv_types.h:46
@ PRIVATE
Definition kv_types.h:255
@ PUBLIC
Definition kv_types.h:254
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:243
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:246
@ FAIL_NO_REPLICATE
Definition kv_types.h:249
@ SUCCESS
Definition kv_types.h:247
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:581
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
ApplyResult
Definition kv_types.h:339
@ FAIL
Definition kv_types.h:348
@ PASS
Definition kv_types.h:340
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:42
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1296
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:17
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Version version
Definition kv_types.h:52
Term term
Definition kv_types.h:51