17#define FMT_HEADER_ONLY
19#include <fmt/format.h>
32 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
60 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>,
bool>>
66 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
90 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
91 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
95 std::shared_ptr<Consensus>
consensus =
nullptr;
96 std::shared_ptr<TxHistory> history =
nullptr;
97 std::shared_ptr<ILedgerChunker> chunker =
nullptr;
106 const bool strict_versions =
true;
109 const bool is_historical =
false;
114 std::atomic<uint8_t> flags = 0;
116 bool commit_deserialised(
122 bool track_deletes_on_missing_keys)
override
124 auto c = apply_changes(
126 [v](
bool) {
return std::make_tuple(v, v - 1); },
130 track_deletes_on_missing_keys);
133 LOG_FAIL_FMT(
"Failed to commit deserialised Tx at version {}", v);
145 bool has_map_internal(
const std::string& name)
147 return maps.contains(name);
157 if (
version > std::numeric_limits<int64_t>::max())
169 TxID current_txid_unsafe()
176 Store(
bool strict_versions_ =
true,
bool is_historical_ =
false) :
177 strict_versions(strict_versions_),
178 is_historical(is_historical_)
193 std::atomic_store(&
consensus, consensus_);
218 encryptor = encryptor_;
228 snapshotter = snapshotter_;
247 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
260 auto search =
maps.find(map_name);
261 if (search !=
maps.end())
263 const auto& [map_creation_version, map_ptr] = search->second;
264 if (v >= map_creation_version || map_creation_version == NoVersion)
286 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
289 throw std::logic_error(fmt::format(
290 "Can't add dynamic map - {} is not of expected type",
294 const auto map_name =
map->get_name();
297 throw std::logic_error(fmt::format(
298 "Can't add dynamic map - already have a map named {}", map_name));
301 LOG_DEBUG_FMT(
"Adding newly created map '{}' at version {}", map_name, v);
302 maps[map_name] = std::make_pair(v,
map);
306 const auto global_it = global_hooks.find(map_name);
307 if (global_it != global_hooks.end())
309 map->set_global_hook(global_it->second);
312 const auto map_it = map_hooks.find(map_name);
313 if (map_it != map_hooks.end())
315 map->set_map_hook(map_it->second);
325 throw std::logic_error(fmt::format(
326 "Cannot snapshot at version {} which is earlier than last "
327 "compacted version {} ",
334 throw std::logic_error(fmt::format(
335 "Cannot snapshot at version {} which is later than current "
341 auto snapshot = std::make_unique<StoreSnapshot>(v);
344 for (
auto& it :
maps)
346 auto& [_,
map] = it.second;
347 snapshot->add_map_snapshot(
map->snapshot(v));
353 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
359 snapshot->add_view_history(c->get_view_history(v));
369 for (
auto& it :
maps)
371 auto& [_,
map] = it.second;
378 for (
auto& it :
maps)
380 auto& [_,
map] = it.second;
387 std::unique_ptr<AbstractSnapshot> snapshot)
override
390 return snapshot->serialise(e);
397 std::vector<Version>* view_history =
nullptr,
398 bool public_only =
false)
override
404 std::optional<ccf::kv::SecurityDomain>());
408 auto v_ = d.init(data, size, term, entry_flags, is_historical);
411 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
415 std::shared_ptr<TxHistory> h =
nullptr;
416 std::vector<uint8_t> hash_at_snapshot;
417 std::vector<Version> view_history_;
419 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
421 for (
auto& it :
maps)
423 auto& [_,
map] = it.second;
430 hash_at_snapshot = d.deserialise_raw();
433 if (view_history !=
nullptr)
435 view_history_ = d.deserialise_view_history();
441 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
443 const auto map_name = r.value();
445 std::shared_ptr<ccf::kv::untyped::Map>
map =
nullptr;
447 auto search =
maps.find(map_name);
448 if (search ==
maps.end())
450 map = std::make_shared<ccf::kv::untyped::Map>(
451 this, map_name, get_security_domain(map_name));
452 new_maps[map_name] =
map;
454 "Creating map {} while deserialising snapshot at version {}",
460 map = search->second.second;
463 auto changes_search = changes.find(map_name);
464 if (changes_search != changes.end())
466 LOG_FAIL_FMT(
"Failed to deserialise snapshot at version {}", v);
471 auto deserialised_snapshot_changes =
472 map->deserialise_snapshot_changes(d);
476 changes.emplace_hint(
478 std::piecewise_construct,
479 std::forward_as_tuple(map_name),
480 std::forward_as_tuple(
481 map, std::move(deserialised_snapshot_changes)));
484 for (
auto& it :
maps)
486 auto& [_,
map] = it.second;
492 LOG_FAIL_FMT(
"Unexpected content in snapshot at version {}", v);
499 bool track_deletes_on_missing_keys =
false;
500 auto r = apply_changes(
502 [](
bool) {
return std::make_tuple(NoVersion, NoVersion); },
507 track_deletes_on_missing_keys);
511 "Failed to commit deserialised snapshot at version {}", v);
524 if (!h->init_from_snapshot(hash_at_snapshot))
530 if (view_history !=
nullptr)
532 *view_history = std::move(view_history_);
547 bool generate_snapshot = c && c->is_primary();
548 snapshotter->commit(v, generate_snapshot);
553 chunker->compacted_to(v);
556 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
563 for (
auto& it :
maps)
565 auto& [_,
map] = it.second;
569 for (
auto& it :
maps)
571 auto& [_,
map] = it.second;
575 for (
auto& it :
maps)
577 auto& [_,
map] = it.second;
592 for (
auto& it :
maps)
594 auto& [_,
map] = it.second;
607 snapshotter->rollback(tx_id.
seqno);
612 chunker->rolled_back_to(tx_id.
seqno);
615 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
621 throw std::logic_error(fmt::format(
622 "Attempting rollback to {}, earlier than commit version {}",
656 e->rollback(tx_id.
seqno);
660 for (
auto& it :
maps)
662 auto& [_,
map] = it.second;
666 auto it =
maps.begin();
667 while (it !=
maps.end())
669 auto& [map_creation_version,
map] = it->second;
673 if (map_creation_version > tx_id.
seqno)
686 for (
auto& map_it :
maps)
688 auto& [_,
map] = map_it.second;
700 throw std::logic_error(
"term_of_next_version is already initialised");
712 const std::vector<uint8_t>& data,
720 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
721 bool ignore_strict_versions =
false)
override
733 std::optional<ccf::kv::SecurityDomain>());
736 d.init(data.data(), data.size(), view, entry_flags, is_historical);
739 LOG_FAIL_FMT(
"Initialisation of deserialise object failed");
744 claims_digest = std::move(d.consume_claims_digest());
746 "Deserialised claim digest {} {}",
747 claims_digest.
value(),
748 claims_digest.
empty());
750 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
751 if (commit_evidence_digest.has_value())
754 "Deserialised commit evidence digest {}",
755 commit_evidence_digest.value());
762 if (strict_versions && !ignore_strict_versions)
769 "Tried to deserialise {} but current_version is {}", v, cv);
778 std::lock_guard<ccf::pal::Mutex> mguard(
maps_lock);
780 for (
auto r = d.start_map(); r.has_value(); r = d.start_map())
782 const auto map_name = r.value();
787 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
788 this, map_name, get_security_domain(map_name));
790 new_maps[map_name] = new_map;
792 "Creating map '{}' while deserialising transaction at version {}",
797 auto change_search = changes.find(map_name);
798 if (change_search != changes.end())
800 LOG_FAIL_FMT(
"Failed to deserialise transaction at version {}", v);
805 auto deserialised_changes =
map->deserialise_changes(d, v);
809 changes.emplace_hint(
811 std::piecewise_construct,
812 std::forward_as_tuple(map_name),
813 std::forward_as_tuple(
map, std::move(deserialised_changes)));
818 LOG_FAIL_FMT(
"Unexpected content in transaction at version {}", v);
826 const std::vector<uint8_t>& data,
827 bool public_only =
false,
828 const std::optional<TxID>& expected_txid = std::nullopt)
override
830 auto exec = std::make_unique<CFTExecutionWrapper>(
843 if (
maps.size() != that.
maps.size())
848 return std::ranges::all_of(
maps, [&that](
const auto& entry) {
849 const auto& [map_name, map_pair] = entry;
850 auto search = that.
maps.find(map_name);
852 if (search == that.
maps.end())
857 const auto& [this_v, this_map] = map_pair;
858 const auto& [that_v, that_map] = search->second;
860 if (this_v != that_v)
865 if (*this_map != *that_map)
882 return current_txid_unsafe();
905 std::unique_ptr<PendingTx> pending_tx,
906 bool globally_committable)
override
914 std::lock_guard<ccf::pal::Mutex> cguard(
commit_lock);
917 "Store::commit {}{}",
919 (globally_committable ?
" globally_committable" :
""));
922 Version previous_last_replicated = 0;
923 Version next_last_replicated = 0;
924 Version previous_rollback_count = 0;
927 std::vector<std::tuple<std::unique_ptr<PendingTx>,
bool>>
928 contiguous_pending_txs;
938 "Want to commit for term {} but term is {}",
952 std::make_tuple(std::move(pending_tx), globally_committable)});
956 for (
Version offset = 1;
true; ++offset)
962 "Couldn't find {} = {} + {}, giving up on batch while committing "
972 contiguous_pending_txs.emplace_back(std::move(search->second));
978 next_last_replicated =
last_replicated + contiguous_pending_txs.size();
984 if (contiguous_pending_txs.empty())
990 for (
auto& [pending_tx_, committable_] : contiguous_pending_txs)
993 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
996 std::make_shared<std::vector<uint8_t>>(std::move(data_));
998 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
1005 "Failed Tx commit {}", previous_last_replicated + offset);
1013 "Unexpected failure reason {} during commit of {}.{}",
1014 static_cast<int>(success_),
1023 *data_shared, commit_evidence_digest_, claims_digest_),
1029 chunker->append_entry_size(data_shared->size());
1033 "Batching {} ({}) during commit of {}.{}",
1034 previous_last_replicated + offset,
1035 data_shared->size(),
1040 previous_last_replicated + offset,
1048 if (c->replicate(batch, replication_view))
1089 r |= chunker->is_chunk_end_requested(
version);
1094 r |= snapshotter->record_committable(
version);
1125 Version v = next_version_unsafe();
1133 return std::make_tuple(v, previous_last_new_map);
1139 return next_version_unsafe();
1145 next_version_unsafe();
1173 if (source_version > target_version)
1175 throw std::runtime_error(fmt::format(
1176 "Invalid call to swap_private_maps. Source is at version {} while "
1183 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1187 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1188 std::vector<MapEntry> entries;
1191 for (
auto& [name, pair] : store.
maps)
1193 auto& [_,
map] = pair;
1197 entries.emplace_back(name,
nullptr,
map.get());
1203 auto entry = entries.begin();
1204 while (entry != entries.end())
1206 const auto& [name, _, their_map] = *entry;
1207 std::shared_ptr<AbstractMap>
map =
nullptr;
1208 const auto it =
maps.find(name);
1209 if (it ==
maps.end())
1214 auto new_map = std::make_pair(
1216 std::make_shared<ccf::kv::untyped::Map>(
1218 maps[name] = new_map;
1219 map = new_map.second;
1223 map = it->second.second;
1226 throw std::logic_error(fmt::format(
1227 "Swap mismatch - map {} is private in source but not in target",
1232 std::get<1>(*entry) =
map.get();
1237 for (
auto& [name, lhs, rhs] : entries)
1242 for (
auto& [name, lhs, rhs] : entries)
1252 map_hooks[map_name] = hook;
1254 const auto it =
maps.find(map_name);
1255 if (it !=
maps.end())
1257 it->second.second->set_map_hook(hook);
1263 map_hooks.erase(map_name);
1265 const auto it =
maps.find(map_name);
1266 if (it !=
maps.end())
1268 it->second.second->unset_map_hook();
1273 const std::string& map_name,
1276 global_hooks[map_name] = hook;
1278 const auto it =
maps.find(map_name);
1279 if (it !=
maps.end())
1281 it->second.second->set_global_hook(hook);
1287 global_hooks.erase(map_name);
1289 const auto it =
maps.find(map_name);
1290 if (it !=
maps.end())
1292 it->second.second->unset_global_hook();
1303 return std::make_unique<ReadOnlyTx>(
this);
1318 return std::make_unique<CommittableTx>(
this);
1347 this->flags.fetch_or(
static_cast<uint8_t
>(f), std::memory_order_relaxed);
1352 this->flags.fetch_and(
1353 ~
static_cast<uint8_t
>(f), std::memory_order_relaxed);
1358 return (flags.load(std::memory_order_relaxed) &
1359 static_cast<uint8_t
>(f)) != 0;
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:632
StoreFlag
Definition kv_types.h:701
@ SNAPSHOT_AT_NEXT_SIGNATURE
Definition committable_tx.h:19
Definition deserialise.h:19
Definition read_only_store.h:13
Definition committable_tx.h:370
Term term_of_last_version
Definition store.h:51
std::atomic< Version > compacted
Definition store.h:39
std::atomic< Version > version
Definition store.h:37
ccf::pal::Mutex maps_lock
Definition store.h:33
Version last_committable
Definition store.h:56
Version rollback_count
Definition store.h:58
Version last_new_map
Definition store.h:38
Maps maps
Definition store.h:34
std::atomic< Term > term_of_next_version
Definition store.h:45
void clear()
Definition store.h:64
ccf::pal::Mutex version_lock
Definition store.h:36
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:32
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:61
Version last_replicated
Definition store.h:53
ccf::pal::Mutex commit_lock
Definition store.h:42
bool should_schedule_snapshot()
Definition store.h:1064
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1080
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:393
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1321
ReadOnlyTx create_read_only_tx() override
Definition store.h:1296
void set_flag_unsafe(StoreFlag f) override
Definition store.h:1345
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:251
size_t committable_gap() override
Definition store.h:1150
TxID next_txid() override
Definition store.h:1142
void unset_global_hook(const std::string &map_name)
Definition store.h:1285
void initialise_term(Term t) override
Definition store.h:693
Store(const Store &that)=delete
void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1350
std::shared_ptr< TxHistory > get_history() override
Definition store.h:196
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1249
void unlock_map_set() override
Definition store.h:1111
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:226
void unset_map_hook(const std::string &map_name)
Definition store.h:1261
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:283
Version current_version() override
Definition store.h:873
void swap_private_maps(Store &store)
Definition store.h:1168
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:244
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1122
bool flag_enabled(StoreFlag f) override
Definition store.h:1339
bool operator==(const Store &that) const
Definition store.h:835
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:206
EncryptorPtr get_encryptor() override
Definition store.h:221
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:386
bool check_rollback_count(Version count) override
Definition store.h:1116
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1316
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:191
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:599
void unset_flag(StoreFlag f) override
Definition store.h:1333
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:320
void lock_map_set() override
Definition store.h:1106
Term commit_view() override
Definition store.h:897
void unlock_maps() override
Definition store.h:376
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1301
void lock_maps() override
Definition store.h:366
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:216
CommittableTx create_tx()
Definition store.h:1311
bool should_create_ledger_chunk(Version version) override
Definition store.h:1074
Version next_version() override
Definition store.h:1136
Version compacted_version() override
Definition store.h:892
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:176
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1272
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:885
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:183
void set_flag(StoreFlag f) override
Definition store.h:1327
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:825
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:211
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:201
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:903
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:711
ccf::TxID current_txid() override
Definition store.h:878
void compact(Version v) override
Definition store.h:538
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:257
TxDiff create_tx_diff() override
Definition store.h:1306
bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1356
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:18
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:527
uint64_t Term
Definition kv_types.h:46
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:209
CommitResult
Definition kv_types.h:212
@ FAIL_NO_REPLICATE
Definition kv_types.h:215
@ SUCCESS
Definition kv_types.h:213
@ PRIVATE
Definition kv_types.h:221
@ PUBLIC
Definition kv_types.h:220
uint64_t Version
Definition version.h:10
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:539
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1363
ApplyResult
Definition kv_types.h:305
@ FAIL
Definition kv_types.h:314
@ PASS
Definition kv_types.h:306
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45