CCF
Loading...
Searching...
No Matches
store.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
8#include "ccf/pal/locking.h"
9#include "deserialise.h"
10#include "kv/committable_tx.h"
11#include "kv/snapshot.h"
12#include "kv/untyped_map.h"
13#include "kv_serialiser.h"
14#include "kv_types.h"
15
16#define FMT_HEADER_ONLY
17#include <atomic>
18#include <fmt/format.h>
19#include <memory>
20
21namespace ccf::kv
22{
24 {
25 protected:
26 // All collections of Map must be ordered so that we lock their contained
27 // maps in a stable order. The order here is by map name. The version
28 // indicates the version at which the Map was created.
29 using Maps = std::map<
30 std::string,
31 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
34
36 std::atomic<Version> version = 0;
37 Version last_new_map = ccf::kv::NoVersion;
38 std::atomic<Version> compacted = 0;
39
40 // Calls to Store::commit are made atomic by taking this lock.
42
43 // Term at which write future transactions should be committed.
44 std::atomic<Term> term_of_next_version = 0;
45
46 // Term at which the last entry was committed. Further transactions
47 // should read in that term. Note that it is assumed that the history of
48 // terms of past transactions is kept track of by and specified by the
49 // caller on rollback
51
53 // Version of the latest committable entry committed in this term and by
54 // _this_ store.
56
58
59 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>, bool>>
61
62 public:
63 void clear()
64 {
65 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
67
68 maps.clear();
69 pending_txs.clear();
70
71 version = 0;
72 last_new_map = ccf::kv::NoVersion;
73 compacted = 0;
76
80 }
81 };
82
83 class Store : public AbstractStore,
84 public StoreState,
86 public ReadOnlyStore
87 {
88 private:
89 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
90 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
91 Hooks global_hooks;
92 MapHooks map_hooks;
93
94 std::shared_ptr<Consensus> consensus = nullptr;
95 std::shared_ptr<TxHistory> history = nullptr;
96 EncryptorPtr encryptor = nullptr;
97 SnapshotterPtr snapshotter = nullptr;
98
99 // Generally we will only accept deserialised views if they are contiguous -
100 // at Version N we reject everything but N+1. The exception is when a Store
101 // is used for historical queries, where it may deserialise arbitrary
102 // transactions. In this case the Store is a useful container for a set of
103 // Tables, but its versioning invariants are ignored.
104 const bool strict_versions = true;
105
106 // If true, use historical ledger secrets to deserialise entries
107 const bool is_historical = false;
108
109 // Ledger entry header flags
110 uint8_t flags = 0;
111
112 bool commit_deserialised(
113 OrderedChanges& changes,
114 Version v,
115 Term term,
116 const MapCollection& new_maps,
118 bool track_deletes_on_missing_keys) override
119 {
120 auto c = apply_changes(
121 changes,
122 [v](bool) { return std::make_tuple(v, v - 1); },
123 hooks,
124 new_maps,
125 std::nullopt,
126 false,
127 track_deletes_on_missing_keys);
128 if (!c.has_value())
129 {
130 LOG_FAIL_FMT("Failed to commit deserialised Tx at version {}", v);
131 return false;
132 }
133 {
134 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
135 version = v;
138 }
139 return true;
140 }
141
142 bool has_map_internal(const std::string& name)
143 {
144 auto search = maps.find(name);
145 if (search != maps.end())
146 return true;
147
148 return false;
149 }
150
151 Version next_version_unsafe()
152 {
153 // Get the next global version
154 ++version;
155
156 // Version was previously signed, with negative values representing
157 // deletions. Maintain this restriction for compatibility with old code.
158 if (version > std::numeric_limits<int64_t>::max())
159 {
160 LOG_FAIL_FMT("KV version too large - wrapping to 0");
161 version = 0;
162 }
163
164 // Further transactions should read in the commit term
166
167 return version;
168 }
169
170 TxID current_txid_unsafe()
171 {
172 // version_lock should be first acquired
174 }
175
176 public:
177 Store(bool strict_versions_ = true, bool is_historical_ = false) :
178 strict_versions(strict_versions_),
179 is_historical(is_historical_)
180 {}
181
182 Store(const Store& that) = delete;
183
184 std::shared_ptr<Consensus> get_consensus() override
185 {
186 // We need to use std::atomic_load<std::shared_ptr<T>>
187 // after clang supports it.
188 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
189 return std::atomic_load(&consensus);
190 }
191
192 void set_consensus(const std::shared_ptr<Consensus>& consensus_)
193 {
194 std::atomic_store(&consensus, consensus_);
195 }
196
197 std::shared_ptr<TxHistory> get_history() override
198 {
199 return history;
200 }
201
202 void set_history(const std::shared_ptr<TxHistory>& history_)
203 {
204 history = history_;
205 }
206
207 void set_encryptor(const EncryptorPtr& encryptor_)
208 {
209 encryptor = encryptor_;
210 }
211
213 {
214 return encryptor;
215 }
216
217 void set_snapshotter(const SnapshotterPtr& snapshotter_)
218 {
219 snapshotter = snapshotter_;
220 }
221
235 std::shared_ptr<AbstractMap> get_map(
236 ccf::kv::Version v, const std::string& map_name) override
237 {
238 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
239 return get_map_internal(v, map_name);
240 }
241
242 std::shared_ptr<AbstractMap> get_map_unsafe(
243 ccf::kv::Version v, const std::string& map_name) override
244 {
245 return get_map_internal(v, map_name);
246 }
247
248 std::shared_ptr<ccf::kv::untyped::Map> get_map_internal(
249 ccf::kv::Version v, const std::string& map_name)
250 {
251 auto search = maps.find(map_name);
252 if (search != maps.end())
253 {
254 const auto& [map_creation_version, map_ptr] = search->second;
255 if (v >= map_creation_version || map_creation_version == NoVersion)
256 {
257 return map_ptr;
258 }
259 }
260
261 return nullptr;
262 }
263
275 ccf::kv::Version v, const std::shared_ptr<AbstractMap>& map_) override
276 {
277 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
278 if (map == nullptr)
279 {
280 throw std::logic_error(fmt::format(
281 "Can't add dynamic map - {} is not of expected type",
282 map_->get_name()));
283 }
284
285 const auto map_name = map->get_name();
286 if (get_map_unsafe(v, map_name) != nullptr)
287 {
288 throw std::logic_error(fmt::format(
289 "Can't add dynamic map - already have a map named {}", map_name));
290 }
291
292 LOG_DEBUG_FMT("Adding newly created map '{}' at version {}", map_name, v);
293 maps[map_name] = std::make_pair(v, map);
294
295 {
296 // If we have any hooks for the given map name, set them on this new map
297 const auto global_it = global_hooks.find(map_name);
298 if (global_it != global_hooks.end())
299 {
300 map->set_global_hook(global_it->second);
301 }
302
303 const auto map_it = map_hooks.find(map_name);
304 if (map_it != map_hooks.end())
305 {
306 map->set_map_hook(map_it->second);
307 }
308 }
309 }
310
311 std::unique_ptr<AbstractSnapshot> snapshot_unsafe_maps(Version v) override
312 {
313 auto cv = compacted_version();
314 if (v < cv)
315 {
316 throw std::logic_error(fmt::format(
317 "Cannot snapshot at version {} which is earlier than last "
318 "compacted version {} ",
319 v,
320 cv));
321 }
322
323 if (v > current_version())
324 {
325 throw std::logic_error(fmt::format(
326 "Cannot snapshot at version {} which is later than current "
327 "version {} ",
328 v,
329 current_version()));
330 }
331
332 auto snapshot = std::make_unique<StoreSnapshot>(v);
333
334 {
335 for (auto& it : maps)
336 {
337 auto& [_, map] = it.second;
338 snapshot->add_map_snapshot(map->snapshot(v));
339 }
340
341 auto h = get_history();
342 if (h)
343 {
344 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
345 }
346
347 auto c = get_consensus();
348 if (c)
349 {
350 snapshot->add_view_history(c->get_view_history(v));
351 }
352 }
353
354 return snapshot;
355 }
356
357 void lock_maps() override
358 {
359 maps_lock.lock();
360 for (auto& it : maps)
361 {
362 auto& [_, map] = it.second;
363 map->lock();
364 }
365 }
366
367 void unlock_maps() override
368 {
369 for (auto& it : maps)
370 {
371 auto& [_, map] = it.second;
372 map->unlock();
373 }
374 maps_lock.unlock();
375 }
376
377 std::vector<uint8_t> serialise_snapshot(
378 std::unique_ptr<AbstractSnapshot> snapshot) override
379 {
380 auto e = get_encryptor();
381 return snapshot->serialise(e);
382 }
383
385 const uint8_t* data,
386 size_t size,
388 std::vector<Version>* view_history = nullptr,
389 bool public_only = false) override
390 {
391 auto e = get_encryptor();
392 auto d = KvStoreDeserialiser(
393 e,
394 public_only ? ccf::kv::SecurityDomain::PUBLIC :
395 std::optional<ccf::kv::SecurityDomain>());
396
397 ccf::kv::Term term;
398 auto v_ = d.init(data, size, term, is_historical);
399 if (!v_.has_value())
400 {
401 LOG_FAIL_FMT("Initialisation of deserialise object failed");
402 return ApplyResult::FAIL;
403 }
404 auto v = v_.value();
405 std::shared_ptr<TxHistory> h = nullptr;
406 std::vector<uint8_t> hash_at_snapshot;
407 std::vector<Version> view_history_;
408 {
409 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
410
411 for (auto& it : maps)
412 {
413 auto& [_, map] = it.second;
414 map->lock();
415 }
416
417 h = get_history();
418 if (h)
419 {
420 hash_at_snapshot = d.deserialise_raw();
421 }
422
423 if (view_history)
424 {
425 view_history_ = d.deserialise_view_history();
426 }
427
428 OrderedChanges changes;
429 MapCollection new_maps;
430
431 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
432 {
433 const auto map_name = r.value();
434
435 std::shared_ptr<ccf::kv::untyped::Map> map = nullptr;
436
437 auto search = maps.find(map_name);
438 if (search == maps.end())
439 {
440 map = std::make_shared<ccf::kv::untyped::Map>(
441 this, map_name, get_security_domain(map_name));
442 new_maps[map_name] = map;
444 "Creating map {} while deserialising snapshot at version {}",
445 map_name,
446 v);
447 }
448 else
449 {
450 map = search->second.second;
451 }
452
453 auto changes_search = changes.find(map_name);
454 if (changes_search != changes.end())
455 {
456 LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
457 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
458 return ApplyResult::FAIL;
459 }
460
461 auto deserialised_snapshot_changes =
462 map->deserialise_snapshot_changes(d);
463
464 // Take ownership of the produced change set, store it to be committed
465 // later
466 changes.emplace_hint(
467 changes_search,
468 std::piecewise_construct,
469 std::forward_as_tuple(map_name),
470 std::forward_as_tuple(
471 map, std::move(deserialised_snapshot_changes)));
472 }
473
474 for (auto& it : maps)
475 {
476 auto& [_, map] = it.second;
477 map->unlock();
478 }
479
480 if (!d.end())
481 {
482 LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
483 return ApplyResult::FAIL;
484 }
485
486 // Each map is committed at a different version, independently of the
487 // overall snapshot version. The commit versions for each map are
488 // contained in the snapshot and applied when the snapshot is committed.
489 bool track_deletes_on_missing_keys = false;
490 auto r = apply_changes(
491 changes,
492 [](bool) { return std::make_tuple(NoVersion, NoVersion); },
493 hooks,
494 new_maps,
495 std::nullopt,
496 false,
497 track_deletes_on_missing_keys);
498 if (!r.has_value())
499 {
501 "Failed to commit deserialised snapshot at version {}", v);
502 return ApplyResult::FAIL;
503 }
504
505 {
506 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
507 version = v;
508 last_replicated = v;
509 }
510 }
511
512 if (h)
513 {
514 if (!h->init_from_snapshot(hash_at_snapshot))
515 {
516 return ApplyResult::FAIL;
517 }
518 }
519
520 if (view_history)
521 {
522 *view_history = std::move(view_history_);
523 }
524
525 return ApplyResult::PASS;
526 }
527
528 void compact(Version v) override
529 {
530 // This is called when the store will never be rolled back to any
531 // state before the specified version.
532 // No transactions can be prepared or committed during compaction.
533
534 if (snapshotter)
535 {
536 auto c = get_consensus();
537 bool generate_snapshot = c && c->is_primary();
538 snapshotter->commit(v, generate_snapshot);
539 }
540
541 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
542
543 if (v > current_version())
544 {
545 return;
546 }
547
548 for (auto& it : maps)
549 {
550 auto& [_, map] = it.second;
551 map->lock();
552 }
553
554 for (auto& it : maps)
555 {
556 auto& [_, map] = it.second;
557 map->compact(v);
558 }
559
560 for (auto& it : maps)
561 {
562 auto& [_, map] = it.second;
563 map->unlock();
564 }
565
566 {
567 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
568 compacted = v;
569
570 auto h = get_history();
571 if (h)
572 {
573 h->compact(v);
574 }
575 }
576
577 for (auto& it : maps)
578 {
579 auto& [_, map] = it.second;
580 map->post_compact();
581 }
582 }
583
584 void rollback(const TxID& tx_id, Term term_of_next_version_) override
585 {
586 // This is called to roll the store back to the state it was in
587 // at the specified version.
588 // No transactions can be prepared or committed during rollback.
589
590 if (snapshotter)
591 {
592 snapshotter->rollback(tx_id.version);
593 }
594
595 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
596
597 {
598 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
599 if (tx_id.version < compacted)
600 {
601 throw std::logic_error(fmt::format(
602 "Attempting rollback to {}, earlier than commit version {}",
603 tx_id.version,
604 compacted));
605 }
606
607 // The term should always be updated on rollback() when passed
608 // regardless of whether version needs to be updated or not
609 term_of_next_version = term_of_next_version_;
611
612 // History must be informed of the term_of_last_version change, even if
613 // no actual rollback is required
614 auto h = get_history();
615 if (h)
616 {
617 h->rollback(tx_id, term_of_next_version);
618 }
619
620 if (tx_id.version >= version)
621 {
622 return;
623 }
624
625 version = tx_id.version;
626 last_replicated = tx_id.version;
630 pending_txs.clear();
631 auto e = get_encryptor();
632 if (e)
633 {
634 e->rollback(tx_id.version);
635 }
636 }
637
638 for (auto& it : maps)
639 {
640 auto& [_, map] = it.second;
641 map->lock();
642 }
643
644 auto it = maps.begin();
645 while (it != maps.end())
646 {
647 auto& [map_creation_version, map] = it->second;
648 // Rollback this map whether we're forgetting about it or not. Anyone
649 // else still holding it should see it has rolled back
650 map->rollback(tx_id.version);
651 if (map_creation_version > tx_id.version)
652 {
653 // Map was created more recently; its creation is being forgotten.
654 // Erase our knowledge of it
655 map->unlock();
656 it = maps.erase(it);
657 }
658 else
659 {
660 ++it;
661 }
662 }
663
664 for (auto& map_it : maps)
665 {
666 auto& [_, map] = map_it.second;
667 map->unlock();
668 }
669 }
670
671 void initialise_term(Term t) override
672 {
673 // Note: This should only be called once, when the store is first
674 // initialised. term_of_next_version is later updated via rollback.
675 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
676 if (term_of_next_version != 0)
677 {
678 throw std::logic_error("term_of_next_version is already initialised");
679 }
680
682 auto h = get_history();
683 if (h)
684 {
685 h->set_term(term_of_next_version);
686 }
687 }
688
690 const std::vector<uint8_t>& data,
691 bool public_only,
694 OrderedChanges& changes,
695 MapCollection& new_maps,
696 ccf::ClaimsDigest& claims_digest,
697 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
698 bool ignore_strict_versions = false) override
699 {
700 // This will return FAILED if the serialised transaction is being
701 // applied out of order.
702 // Processing transactions locally and also deserialising to the
703 // same store will result in a store version mismatch and
704 // deserialisation will then fail.
705 auto e = get_encryptor();
706
707 auto d = KvStoreDeserialiser(
708 e,
709 public_only ? ccf::kv::SecurityDomain::PUBLIC :
710 std::optional<ccf::kv::SecurityDomain>());
711
712 auto v_ = d.init(data.data(), data.size(), view, is_historical);
713 if (!v_.has_value())
714 {
715 LOG_FAIL_FMT("Initialisation of deserialise object failed");
716 return false;
717 }
718 v = v_.value();
719
720 claims_digest = std::move(d.consume_claims_digest());
722 "Deserialised claim digest {} {}",
723 claims_digest.value(),
724 claims_digest.empty());
725
726 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
727 if (commit_evidence_digest.has_value())
729 "Deserialised commit evidence digest {}",
730 commit_evidence_digest.value());
731
732 // Throw away any local commits that have not propagated via the
733 // consensus.
735
736 if (strict_versions && !ignore_strict_versions)
737 {
738 // Make sure this is the next transaction.
739 auto cv = current_version();
740 if (cv != (v - 1))
741 {
743 "Tried to deserialise {} but current_version is {}", v, cv);
744 return false;
745 }
746 }
747
748 // Deserialised transactions express read dependencies as versions,
749 // rather than with the actual value read. As a result, they don't
750 // need snapshot isolation on the map state, and so do not need to
751 // lock each of the maps before creating the transaction.
752 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
753
754 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
755 {
756 const auto map_name = r.value();
757
758 auto map = get_map_internal(v, map_name);
759 if (map == nullptr)
760 {
761 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
762 this, map_name, get_security_domain(map_name));
763 map = new_map;
764 new_maps[map_name] = new_map;
766 "Creating map '{}' while deserialising transaction at version {}",
767 map_name,
768 v);
769 }
770
771 auto change_search = changes.find(map_name);
772 if (change_search != changes.end())
773 {
774 LOG_FAIL_FMT("Failed to deserialise transaction at version {}", v);
775 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
776 return false;
777 }
778
779 auto deserialised_changes = map->deserialise_changes(d, v);
780
781 // Take ownership of the produced change set, store it to be applied
782 // later
783 changes.emplace_hint(
784 change_search,
785 std::piecewise_construct,
786 std::forward_as_tuple(map_name),
787 std::forward_as_tuple(map, std::move(deserialised_changes)));
788 }
789
790 if (!d.end())
791 {
792 LOG_FAIL_FMT("Unexpected content in transaction at version {}", v);
793 return false;
794 }
795
796 return true;
797 }
798
799 std::unique_ptr<ccf::kv::AbstractExecutionWrapper> deserialize(
800 const std::vector<uint8_t>& data,
801 bool public_only = false,
802 const std::optional<TxID>& expected_txid = std::nullopt) override
803 {
804 auto exec = std::make_unique<CFTExecutionWrapper>(
805 this, get_history(), std::move(data), public_only, expected_txid);
806 return exec;
807 }
808
809 bool operator==(const Store& that) const
810 {
811 // Only used for debugging, not thread safe.
812 if (version != that.version)
813 return false;
814
815 if (maps.size() != that.maps.size())
816 return false;
817
818 for (auto it = maps.begin(); it != maps.end(); ++it)
819 {
820 auto search = that.maps.find(it->first);
821
822 if (search == that.maps.end())
823 return false;
824
825 auto& [this_v, this_map] = it->second;
826 auto& [that_v, that_map] = search->second;
827
828 if (this_v != that_v)
829 return false;
830
831 if (*this_map != *that_map)
832 return false;
833 }
834
835 return true;
836 }
837
839 {
840 return version;
841 }
842
844 {
845 // Must lock in case the version or read term is being incremented.
846 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
847 return current_txid_unsafe();
848 }
849
851 {
852 const auto kv_id = current_txid();
853 return {kv_id.term, kv_id.version};
854 }
855
856 std::pair<TxID, Term> current_txid_and_commit_term() override
857 {
858 // Must lock in case the version or commit term is being incremented.
859 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
860 return {current_txid_unsafe(), term_of_next_version};
861 }
862
864 {
865 return compacted;
866 }
867
868 Term commit_view() override
869 {
870 // Must lock in case the commit_view is being incremented.
872 }
873
875 const TxID& txid,
876 std::unique_ptr<PendingTx> pending_tx,
877 bool globally_committable) override
878 {
879 auto c = get_consensus();
880 if (!c)
881 {
883 }
884
885 std::lock_guard<ccf::pal::Mutex> cguard(commit_lock);
886
888 "Store::commit {}{}",
889 txid.version,
890 (globally_committable ? " globally_committable" : ""));
891
892 BatchVector batch;
893 Version previous_last_replicated = 0;
894 Version next_last_replicated = 0;
895 Version previous_rollback_count = 0;
896 ccf::View replication_view = 0;
897
898 std::vector<std::tuple<std::unique_ptr<PendingTx>, bool>>
899 contiguous_pending_txs;
900 auto h = get_history();
901
902 {
903 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
904 if (txid.term != term_of_next_version && get_consensus()->is_primary())
905 {
906 // This can happen when a transaction started before a view change,
907 // but tries to commit after the view change is complete.
909 "Want to commit for term {} but term is {}",
910 txid.term,
912
914 }
915
916 if (globally_committable && txid.version > last_committable)
917 {
919 }
920
921 pending_txs.insert(
922 {txid.version,
923 std::make_tuple(std::move(pending_tx), globally_committable)});
924
925 LOG_TRACE_FMT("Inserting pending tx at {}", txid.version);
926
927 for (Version offset = 1; true; ++offset)
928 {
929 auto search = pending_txs.find(last_replicated + offset);
930 if (search == pending_txs.end())
931 {
933 "Couldn't find {} = {} + {}, giving up on batch while committing "
934 "{}.{}",
935 last_replicated + offset,
937 offset,
938 txid.term,
939 txid.version);
940 break;
941 }
942
943 contiguous_pending_txs.emplace_back(std::move(search->second));
944 pending_txs.erase(search);
945 }
946
947 previous_rollback_count = rollback_count;
948 previous_last_replicated = last_replicated;
949 next_last_replicated = last_replicated + contiguous_pending_txs.size();
950
951 replication_view = term_of_next_version;
952 }
953 // Release version lock
954
955 if (contiguous_pending_txs.size() == 0)
956 {
958 }
959
960 size_t offset = 1;
961 for (auto& [pending_tx_, committable_] : contiguous_pending_txs)
962 {
963 auto
964 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
965 pending_tx_->call();
966 auto data_shared =
967 std::make_shared<std::vector<uint8_t>>(std::move(data_));
968 auto hooks_shared =
969 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
970
971 // NB: this cannot happen currently. Regular Tx only make it here if
972 // they did succeed, and signatures cannot conflict because they
973 // execute in order with a read_version that's version - 1, so even
974 // two contiguous signatures are fine
975 if (success_ != CommitResult::SUCCESS)
976 {
977 LOG_DEBUG_FMT("Failed Tx commit {}", last_replicated + offset);
978 }
979
980 if (h)
981 {
982 h->append_entry(
983 ccf::entry_leaf(
984 *data_shared, commit_evidence_digest_, claims_digest_),
985 replication_view);
986 }
987
989 "Batching {} ({}) during commit of {}.{}",
990 last_replicated + offset,
991 data_shared->size(),
992 txid.term,
993 txid.version);
994
995 batch.emplace_back(
996 last_replicated + offset, data_shared, committable_, hooks_shared);
997 offset++;
998 }
999
1000 if (c->replicate(batch, replication_view))
1001 {
1002 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1003 if (
1004 last_replicated == previous_last_replicated &&
1005 previous_rollback_count == rollback_count)
1006 {
1007 last_replicated = next_last_replicated;
1008 }
1009 return CommitResult::SUCCESS;
1010 }
1011 else
1012 {
1013 LOG_DEBUG_FMT("Failed to replicate");
1015 }
1016 }
1017
1019 {
1020 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1022 }
1023
1025 {
1026 // Note that snapshotter->record_committable, and therefore this function,
1027 // assumes that `version` is a committable entry/signature.
1028
1029 bool r = flag_enabled_unsafe(
1032
1033 if (snapshotter)
1034 {
1035 r |= snapshotter->record_committable(version);
1036 }
1037
1038 return r;
1039 }
1040
1041 void lock_map_set() override
1042 {
1043 maps_lock.lock();
1044 }
1045
1046 void unlock_map_set() override
1047 {
1048 maps_lock.unlock();
1049 }
1050
1051 bool check_rollback_count(Version count) override
1052 {
1053 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1054 return rollback_count == count;
1055 }
1056
1057 std::tuple<Version, Version> next_version(bool commit_new_map) override
1058 {
1059 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1060 Version v = next_version_unsafe();
1061
1062 auto previous_last_new_map = last_new_map;
1063 if (commit_new_map)
1064 {
1065 last_new_map = v;
1066 }
1067
1068 return std::make_tuple(v, previous_last_new_map);
1069 }
1070
1072 {
1073 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1074 return next_version_unsafe();
1075 }
1076
1077 TxID next_txid() override
1078 {
1079 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1080 next_version_unsafe();
1081
1082 return {term_of_next_version, version};
1083 }
1084
1085 size_t committable_gap() override
1086 {
1087 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1088 return version - last_committable;
1089 }
1090
1104 {
1105 {
1106 const auto source_version = store.current_version();
1107 const auto target_version = current_version();
1108 if (source_version > target_version)
1109 {
1110 throw std::runtime_error(fmt::format(
1111 "Invalid call to swap_private_maps. Source is at version {} while "
1112 "target is at {}",
1113 source_version,
1114 target_version));
1115 }
1116 }
1117
1118 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1119 maps_lock, store.maps_lock);
1120
1121 // Each entry is (Name, MyMap, TheirMap)
1122 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1123 std::vector<MapEntry> entries;
1124
1125 // Get the list of private maps from the source store
1126 for (auto& [name, pair] : store.maps)
1127 {
1128 auto& [_, map] = pair;
1129 if (map->get_security_domain() == SecurityDomain::PRIVATE)
1130 {
1131 map->lock();
1132 entries.emplace_back(name, nullptr, map.get());
1133 }
1134 }
1135
1136 // For each source map, either create it or, where it already exists,
1137 // confirm it is PRIVATE. Lock it and store it in entries
1138 auto entry = entries.begin();
1139 while (entry != entries.end())
1140 {
1141 const auto& [name, _, their_map] = *entry;
1142 std::shared_ptr<AbstractMap> map = nullptr;
1143 const auto it = maps.find(name);
1144 if (it == maps.end())
1145 {
1146 // NB: We lose the creation version from the original map, but assume
1147 // it is irrelevant - its creation should no longer be at risk of
1148 // rollback
1149 auto new_map = std::make_pair(
1150 NoVersion,
1151 std::make_shared<ccf::kv::untyped::Map>(
1152 this, name, SecurityDomain::PRIVATE));
1153 maps[name] = new_map;
1154 map = new_map.second;
1155 }
1156 else
1157 {
1158 map = it->second.second;
1159 if (map->get_security_domain() != SecurityDomain::PRIVATE)
1160 {
1161 throw std::logic_error(fmt::format(
1162 "Swap mismatch - map {} is private in source but not in target",
1163 name));
1164 }
1165 }
1166
1167 std::get<1>(*entry) = map.get();
1168 map->lock();
1169 ++entry;
1170 }
1171
1172 for (auto& [name, lhs, rhs] : entries)
1173 {
1174 lhs->swap(rhs);
1175 }
1176
1177 for (auto& [name, lhs, rhs] : entries)
1178 {
1179 lhs->unlock();
1180 rhs->unlock();
1181 }
1182 }
1183
1185 const std::string& map_name, const ccf::kv::untyped::Map::MapHook& hook)
1186 {
1187 map_hooks[map_name] = hook;
1188
1189 const auto it = maps.find(map_name);
1190 if (it != maps.end())
1191 {
1192 it->second.second->set_map_hook(hook);
1193 }
1194 }
1195
1196 void unset_map_hook(const std::string& map_name)
1197 {
1198 map_hooks.erase(map_name);
1199
1200 const auto it = maps.find(map_name);
1201 if (it != maps.end())
1202 {
1203 it->second.second->unset_map_hook();
1204 }
1205 }
1206
1208 const std::string& map_name,
1210 {
1211 global_hooks[map_name] = hook;
1212
1213 const auto it = maps.find(map_name);
1214 if (it != maps.end())
1215 {
1216 it->second.second->set_global_hook(hook);
1217 }
1218 }
1219
1220 void unset_global_hook(const std::string& map_name)
1221 {
1222 global_hooks.erase(map_name);
1223
1224 const auto it = maps.find(map_name);
1225 if (it != maps.end())
1226 {
1227 it->second.second->unset_global_hook();
1228 }
1229 }
1230
1232 {
1233 return ReadOnlyTx(this);
1234 }
1235
1236 std::unique_ptr<ReadOnlyTx> create_read_only_tx_ptr() override
1237 {
1238 return std::make_unique<ReadOnlyTx>(this);
1239 }
1240
1242 {
1243 return TxDiff(this);
1244 }
1245
1247 {
1248 return CommittableTx(this);
1249 }
1250
1251 std::unique_ptr<CommittableTx> create_tx_ptr()
1252 {
1253 return std::make_unique<CommittableTx>(this);
1254 }
1255
1257 {
1258 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1259 return ReservedTx(this, term_of_last_version, tx_id, rollback_count);
1260 }
1261
1262 virtual void set_flag(Flag f) override
1263 {
1264 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1265 set_flag_unsafe(f);
1266 }
1267
1268 virtual void unset_flag(Flag f) override
1269 {
1270 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1272 }
1273
1274 virtual bool flag_enabled(Flag f) override
1275 {
1276 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1277 return flag_enabled_unsafe(f);
1278 }
1279
1280 virtual void set_flag_unsafe(Flag f) override
1281 {
1282 this->flags |= static_cast<uint8_t>(f);
1283 }
1284
1285 virtual void unset_flag_unsafe(Flag f) override
1286 {
1287 this->flags &= ~static_cast<uint8_t>(f);
1288 }
1289
1290 virtual bool flag_enabled_unsafe(Flag f) const override
1291 {
1292 return (flags & static_cast<uint8_t>(f)) != 0;
1293 }
1294 };
1295
1296 using StorePtr = std::shared_ptr<ccf::kv::Store>;
1297}
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:677
Flag
Definition kv_types.h:745
Definition committable_tx.h:18
Definition deserialise.h:17
Definition read_only_store.h:13
Definition tx.h:161
Definition committable_tx.h:397
Definition store.h:24
Term term_of_last_version
Definition store.h:50
std::atomic< Version > compacted
Definition store.h:38
std::atomic< Version > version
Definition store.h:36
ccf::pal::Mutex maps_lock
Definition store.h:32
Version last_committable
Definition store.h:55
Version rollback_count
Definition store.h:57
Version last_new_map
Definition store.h:37
Maps maps
Definition store.h:33
std::atomic< Term > term_of_next_version
Definition store.h:44
void clear()
Definition store.h:63
ccf::pal::Mutex version_lock
Definition store.h:35
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:31
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:60
Version last_replicated
Definition store.h:52
ccf::pal::Mutex commit_lock
Definition store.h:41
Definition store.h:87
virtual bool flag_enabled(Flag f) override
Definition store.h:1274
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:384
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1256
ReadOnlyTx create_read_only_tx() override
Definition store.h:1231
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:242
size_t committable_gap() override
Definition store.h:1085
TxID next_txid() override
Definition store.h:1077
void unset_global_hook(const std::string &map_name)
Definition store.h:1220
bool must_force_ledger_chunk_unsafe(Version version) override
Definition store.h:1024
void initialise_term(Term t) override
Definition store.h:671
Store(const Store &that)=delete
std::shared_ptr< TxHistory > get_history() override
Definition store.h:197
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1184
void unlock_map_set() override
Definition store.h:1046
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:217
void unset_map_hook(const std::string &map_name)
Definition store.h:1196
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:274
Version current_version() override
Definition store.h:838
void swap_private_maps(Store &store)
Definition store.h:1103
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:235
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1057
bool operator==(const Store &that) const
Definition store.h:809
EncryptorPtr get_encryptor() override
Definition store.h:212
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:377
bool check_rollback_count(Version count) override
Definition store.h:1051
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1251
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:192
ccf::kv::TxID current_txid() override
Definition store.h:843
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:584
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:311
virtual bool flag_enabled_unsafe(Flag f) const override
Definition store.h:1290
void lock_map_set() override
Definition store.h:1041
Term commit_view() override
Definition store.h:868
void unlock_maps() override
Definition store.h:367
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1236
void lock_maps() override
Definition store.h:357
virtual void set_flag(Flag f) override
Definition store.h:1262
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:207
bool must_force_ledger_chunk(Version version) override
Definition store.h:1018
CommittableTx create_tx()
Definition store.h:1246
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:689
Version next_version() override
Definition store.h:1071
Version compacted_version() override
Definition store.h:863
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:177
virtual void unset_flag(Flag f) override
Definition store.h:1268
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1207
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:856
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:184
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:799
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:202
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:874
virtual void unset_flag_unsafe(Flag f) override
Definition store.h:1285
virtual void set_flag_unsafe(Flag f) override
Definition store.h:1280
ccf::TxID get_txid() override
Definition store.h:850
void compact(Version v) override
Definition store.h:528
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:248
TxDiff create_tx_diff() override
Definition store.h:1241
Definition tx.h:132
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition logger.h:378
#define LOG_DEBUG_FMT
Definition logger.h:380
#define LOG_FAIL_FMT
Definition logger.h:396
Definition app_interface.h:20
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:570
uint64_t Term
Definition kv_types.h:46
@ PRIVATE
Definition kv_types.h:255
@ PUBLIC
Definition kv_types.h:254
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:243
uint64_t Version
Definition version.h:8
CommitResult
Definition kv_types.h:246
@ FAIL_NO_REPLICATE
Definition kv_types.h:249
@ SUCCESS
Definition kv_types.h:247
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:581
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
ApplyResult
Definition kv_types.h:339
@ FAIL
Definition kv_types.h:348
@ PASS
Definition kv_types.h:340
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:42
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1296
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:17
view
Definition signatures.h:54
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Definition tx_id.h:44
Definition kv_types.h:50
Version version
Definition kv_types.h:52
Term term
Definition kv_types.h:51