CCF
Loading...
Searching...
No Matches
store.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "apply_changes.h"
7#include "ccf/pal/locking.h"
8#include "deserialise.h"
10#include "kv/committable_tx.h"
12#include "kv/snapshot.h"
13#include "kv/untyped_map.h"
14#include "kv_serialiser.h"
15#include "kv_types.h"
16
17#define FMT_HEADER_ONLY
18#include <atomic>
19#include <fmt/format.h>
20#include <memory>
21
22namespace ccf::kv
23{
25 {
26 protected:
27 // All collections of Map must be ordered so that we lock their contained
28 // maps in a stable order. The order here is by map name. The version
29 // indicates the version at which the Map was created.
30 using Maps = std::map<
31 std::string,
32 std::pair<ccf::kv::Version, std::shared_ptr<untyped::Map>>>;
35
37 std::atomic<Version> version = 0;
38 Version last_new_map = ccf::kv::NoVersion;
39 std::atomic<Version> compacted = 0;
40
41 // Calls to Store::commit are made atomic by taking this lock.
43
44 // Term at which write future transactions should be committed.
45 std::atomic<Term> term_of_next_version = 0;
46
47 // Term at which the last entry was committed. Further transactions
48 // should read in that term. Note that it is assumed that the history of
49 // terms of past transactions is kept track of by and specified by the
50 // caller on rollback
52
54 // Version of the latest committable entry committed in this term and by
55 // _this_ store.
57
59
60 std::unordered_map<Version, std::tuple<std::unique_ptr<PendingTx>, bool>>
62
63 public:
64 void clear()
65 {
66 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> mguard(
68
69 maps.clear();
70 pending_txs.clear();
71
72 version = 0;
73 last_new_map = ccf::kv::NoVersion;
74 compacted = 0;
77
81 }
82 };
83
84 class Store : public AbstractStore,
85 public StoreState,
87 public ReadOnlyStore
88 {
89 private:
90 using Hooks = std::map<std::string, ccf::kv::untyped::Map::CommitHook>;
91 using MapHooks = std::map<std::string, ccf::kv::untyped::Map::MapHook>;
92 Hooks global_hooks;
93 MapHooks map_hooks;
94
95 std::shared_ptr<Consensus> consensus = nullptr;
96 std::shared_ptr<TxHistory> history = nullptr;
97 std::shared_ptr<ILedgerChunker> chunker = nullptr;
98 EncryptorPtr encryptor = nullptr;
99 SnapshotterPtr snapshotter = nullptr;
100
101 // Generally we will only accept deserialised views if they are contiguous -
102 // at Version N we reject everything but N+1. The exception is when a Store
103 // is used for historical queries, where it may deserialise arbitrary
104 // transactions. In this case the Store is a useful container for a set of
105 // Tables, but its versioning invariants are ignored.
106 const bool strict_versions = true;
107
108 // If true, use historical ledger secrets to deserialise entries
109 const bool is_historical = false;
110
111 // Store-level flags (AbstractStore::StoreFlag) influencing behaviour such
112 // as snapshot and ledger chunk decisions. Atomic because _unsafe accessors
113 // may be called from different threads without a common lock.
114 std::atomic<uint8_t> flags = 0;
115
116 bool commit_deserialised(
117 OrderedChanges& changes,
118 Version v,
119 Term term,
120 const MapCollection& new_maps,
122 bool track_deletes_on_missing_keys) override
123 {
124 auto c = apply_changes(
125 changes,
126 [v](bool) { return std::make_tuple(v, v - 1); },
127 hooks,
128 new_maps,
129 std::nullopt,
130 track_deletes_on_missing_keys);
131 if (!c.has_value())
132 {
133 LOG_FAIL_FMT("Failed to commit deserialised Tx at version {}", v);
134 return false;
135 }
136 {
137 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
138 version = v;
141 }
142 return true;
143 }
144
145 bool has_map_internal(const std::string& name)
146 {
147 return maps.contains(name);
148 }
149
150 Version next_version_unsafe()
151 {
152 // Get the next global version
153 ++version;
154
155 // Version was previously signed, with negative values representing
156 // deletions. Maintain this restriction for compatibility with old code.
157 if (version > std::numeric_limits<int64_t>::max())
158 {
159 LOG_FAIL_FMT("KV version too large - wrapping to 0");
160 version = 0;
161 }
162
163 // Further transactions should read in the commit term
165
166 return version;
167 }
168
169 TxID current_txid_unsafe()
170 {
171 // version_lock should be first acquired
173 }
174
175 public:
176 Store(bool strict_versions_ = true, bool is_historical_ = false) :
177 strict_versions(strict_versions_),
178 is_historical(is_historical_)
179 {}
180
181 Store(const Store& that) = delete;
182
183 std::shared_ptr<Consensus> get_consensus() override
184 {
185 // We need to use std::atomic_load<std::shared_ptr<T>>
186 // after clang supports it.
187 // https://en.cppreference.com/w/Template:cpp/compiler_support/20
188 return std::atomic_load(&consensus);
189 }
190
191 void set_consensus(const std::shared_ptr<Consensus>& consensus_)
192 {
193 std::atomic_store(&consensus, consensus_);
194 }
195
196 std::shared_ptr<TxHistory> get_history() override
197 {
198 return history;
199 }
200
201 void set_history(const std::shared_ptr<TxHistory>& history_)
202 {
203 history = history_;
204 }
205
206 std::shared_ptr<ILedgerChunker> get_chunker() override
207 {
208 return chunker;
209 }
210
211 void set_chunker(const std::shared_ptr<ILedgerChunker>& chunker_)
212 {
213 chunker = chunker_;
214 }
215
216 void set_encryptor(const EncryptorPtr& encryptor_)
217 {
218 encryptor = encryptor_;
219 }
220
222 {
223 return encryptor;
224 }
225
226 void set_snapshotter(const SnapshotterPtr& snapshotter_)
227 {
228 snapshotter = snapshotter_;
229 }
230
244 std::shared_ptr<AbstractMap> get_map(
245 ccf::kv::Version v, const std::string& map_name) override
246 {
247 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
248 return get_map_internal(v, map_name);
249 }
250
251 std::shared_ptr<AbstractMap> get_map_unsafe(
252 ccf::kv::Version v, const std::string& map_name) override
253 {
254 return get_map_internal(v, map_name);
255 }
256
257 std::shared_ptr<ccf::kv::untyped::Map> get_map_internal(
258 ccf::kv::Version v, const std::string& map_name)
259 {
260 auto search = maps.find(map_name);
261 if (search != maps.end())
262 {
263 const auto& [map_creation_version, map_ptr] = search->second;
264 if (v >= map_creation_version || map_creation_version == NoVersion)
265 {
266 return map_ptr;
267 }
268 }
269
270 return nullptr;
271 }
272
284 ccf::kv::Version v, const std::shared_ptr<AbstractMap>& map_) override
285 {
286 auto map = std::dynamic_pointer_cast<ccf::kv::untyped::Map>(map_);
287 if (map == nullptr)
288 {
289 throw std::logic_error(fmt::format(
290 "Can't add dynamic map - {} is not of expected type",
291 map_->get_name()));
292 }
293
294 const auto map_name = map->get_name();
295 if (get_map_unsafe(v, map_name) != nullptr)
296 {
297 throw std::logic_error(fmt::format(
298 "Can't add dynamic map - already have a map named {}", map_name));
299 }
300
301 LOG_DEBUG_FMT("Adding newly created map '{}' at version {}", map_name, v);
302 maps[map_name] = std::make_pair(v, map);
303
304 {
305 // If we have any hooks for the given map name, set them on this new map
306 const auto global_it = global_hooks.find(map_name);
307 if (global_it != global_hooks.end())
308 {
309 map->set_global_hook(global_it->second);
310 }
311
312 const auto map_it = map_hooks.find(map_name);
313 if (map_it != map_hooks.end())
314 {
315 map->set_map_hook(map_it->second);
316 }
317 }
318 }
319
320 std::unique_ptr<AbstractSnapshot> snapshot_unsafe_maps(Version v) override
321 {
322 auto cv = compacted_version();
323 if (v < cv)
324 {
325 throw std::logic_error(fmt::format(
326 "Cannot snapshot at version {} which is earlier than last "
327 "compacted version {} ",
328 v,
329 cv));
330 }
331
332 if (v > current_version())
333 {
334 throw std::logic_error(fmt::format(
335 "Cannot snapshot at version {} which is later than current "
336 "version {} ",
337 v,
338 current_version()));
339 }
340
341 auto snapshot = std::make_unique<StoreSnapshot>(v);
342
343 {
344 for (auto& it : maps)
345 {
346 auto& [_, map] = it.second;
347 snapshot->add_map_snapshot(map->snapshot(v));
348 }
349
350 auto h = get_history();
351 if (h)
352 {
353 snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
354 }
355
356 auto c = get_consensus();
357 if (c)
358 {
359 snapshot->add_view_history(c->get_view_history(v));
360 }
361 }
362
363 return snapshot;
364 }
365
366 void lock_maps() override
367 {
368 maps_lock.lock();
369 for (auto& it : maps)
370 {
371 auto& [_, map] = it.second;
372 map->lock();
373 }
374 }
375
376 void unlock_maps() override
377 {
378 for (auto& it : maps)
379 {
380 auto& [_, map] = it.second;
381 map->unlock();
382 }
383 maps_lock.unlock();
384 }
385
386 std::vector<uint8_t> serialise_snapshot(
387 std::unique_ptr<AbstractSnapshot> snapshot) override
388 {
389 auto e = get_encryptor();
390 return snapshot->serialise(e);
391 }
392
394 const uint8_t* data,
395 size_t size,
397 std::vector<Version>* view_history = nullptr,
398 bool public_only = false) override
399 {
400 auto e = get_encryptor();
401 auto d = KvStoreDeserialiser(
402 e,
403 public_only ? ccf::kv::SecurityDomain::PUBLIC :
404 std::optional<ccf::kv::SecurityDomain>());
405
406 ccf::kv::Term term = 0;
407 ccf::kv::EntryFlags entry_flags = {};
408 auto v_ = d.init(data, size, term, entry_flags, is_historical);
409 if (!v_.has_value())
410 {
411 LOG_FAIL_FMT("Initialisation of deserialise object failed");
412 return ApplyResult::FAIL;
413 }
414 auto v = v_.value();
415 std::shared_ptr<TxHistory> h = nullptr;
416 std::vector<uint8_t> hash_at_snapshot;
417 std::vector<Version> view_history_;
418 {
419 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
420
421 for (auto& it : maps)
422 {
423 auto& [_, map] = it.second;
424 map->lock();
425 }
426
427 h = get_history();
428 if (h)
429 {
430 hash_at_snapshot = d.deserialise_raw();
431 }
432
433 if (view_history != nullptr)
434 {
435 view_history_ = d.deserialise_view_history();
436 }
437
438 OrderedChanges changes;
439 MapCollection new_maps;
440
441 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
442 {
443 const auto map_name = r.value();
444
445 std::shared_ptr<ccf::kv::untyped::Map> map = nullptr;
446
447 auto search = maps.find(map_name);
448 if (search == maps.end())
449 {
450 map = std::make_shared<ccf::kv::untyped::Map>(
451 this, map_name, get_security_domain(map_name));
452 new_maps[map_name] = map;
454 "Creating map {} while deserialising snapshot at version {}",
455 map_name,
456 v);
457 }
458 else
459 {
460 map = search->second.second;
461 }
462
463 auto changes_search = changes.find(map_name);
464 if (changes_search != changes.end())
465 {
466 LOG_FAIL_FMT("Failed to deserialise snapshot at version {}", v);
467 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
468 return ApplyResult::FAIL;
469 }
470
471 auto deserialised_snapshot_changes =
472 map->deserialise_snapshot_changes(d);
473
474 // Take ownership of the produced change set, store it to be committed
475 // later
476 changes.emplace_hint(
477 changes_search,
478 std::piecewise_construct,
479 std::forward_as_tuple(map_name),
480 std::forward_as_tuple(
481 map, std::move(deserialised_snapshot_changes)));
482 }
483
484 for (auto& it : maps)
485 {
486 auto& [_, map] = it.second;
487 map->unlock();
488 }
489
490 if (!d.end())
491 {
492 LOG_FAIL_FMT("Unexpected content in snapshot at version {}", v);
493 return ApplyResult::FAIL;
494 }
495
496 // Each map is committed at a different version, independently of the
497 // overall snapshot version. The commit versions for each map are
498 // contained in the snapshot and applied when the snapshot is committed.
499 bool track_deletes_on_missing_keys = false;
500 auto r = apply_changes(
501 changes,
502 [](bool) { return std::make_tuple(NoVersion, NoVersion); },
503 hooks,
504 new_maps,
505 std::nullopt,
506 false,
507 track_deletes_on_missing_keys);
508 if (!r.has_value())
509 {
511 "Failed to commit deserialised snapshot at version {}", v);
512 return ApplyResult::FAIL;
513 }
514
515 {
516 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
517 version = v;
518 last_replicated = v;
519 }
520 }
521
522 if (h)
523 {
524 if (!h->init_from_snapshot(hash_at_snapshot))
525 {
526 return ApplyResult::FAIL;
527 }
528 }
529
530 if (view_history != nullptr)
531 {
532 *view_history = std::move(view_history_);
533 }
534
535 return ApplyResult::PASS;
536 }
537
538 void compact(Version v) override
539 {
540 // This is called when the store will never be rolled back to any
541 // state before the specified version.
542 // No transactions can be prepared or committed during compaction.
543
544 if (snapshotter)
545 {
546 auto c = get_consensus();
547 bool generate_snapshot = c && c->is_primary();
548 snapshotter->commit(v, generate_snapshot);
549 }
550
551 if (chunker)
552 {
553 chunker->compacted_to(v);
554 }
555
556 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
557
558 if (v > current_version())
559 {
560 return;
561 }
562
563 for (auto& it : maps)
564 {
565 auto& [_, map] = it.second;
566 map->lock();
567 }
568
569 for (auto& it : maps)
570 {
571 auto& [_, map] = it.second;
572 map->compact(v);
573 }
574
575 for (auto& it : maps)
576 {
577 auto& [_, map] = it.second;
578 map->unlock();
579 }
580
581 {
582 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
583 compacted = v;
584
585 auto h = get_history();
586 if (h)
587 {
588 h->compact(v);
589 }
590 }
591
592 for (auto& it : maps)
593 {
594 auto& [_, map] = it.second;
595 map->post_compact();
596 }
597 }
598
599 void rollback(const TxID& tx_id, Term term_of_next_version_) override
600 {
601 // This is called to roll the store back to the state it was in
602 // at the specified version.
603 // No transactions can be prepared or committed during rollback.
604
605 if (snapshotter)
606 {
607 snapshotter->rollback(tx_id.seqno);
608 }
609
610 if (chunker)
611 {
612 chunker->rolled_back_to(tx_id.seqno);
613 }
614
615 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
616
617 {
618 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
619 if (tx_id.seqno < compacted)
620 {
621 throw std::logic_error(fmt::format(
622 "Attempting rollback to {}, earlier than commit version {}",
623 tx_id.seqno,
624 compacted));
625 }
626
627 // The term should always be updated on rollback() when passed
628 // regardless of whether version needs to be updated or not
629 term_of_next_version = term_of_next_version_;
631
632 // History must be informed of the term_of_last_version change, even if
633 // no actual rollback is required
634 auto h = get_history();
635 if (h)
636 {
637 h->rollback(tx_id, term_of_next_version);
638 }
639
640 if (tx_id.seqno >= version)
641 {
642 return;
643 }
644
645 version = tx_id.seqno;
646 last_replicated = tx_id.seqno;
647 // In practice rollback is only called at signature seqnos, so
648 // clamping here restores the latest committable entry
649 last_committable = std::min(last_committable, tx_id.seqno);
652 pending_txs.clear();
653 auto e = get_encryptor();
654 if (e)
655 {
656 e->rollback(tx_id.seqno);
657 }
658 }
659
660 for (auto& it : maps)
661 {
662 auto& [_, map] = it.second;
663 map->lock();
664 }
665
666 auto it = maps.begin();
667 while (it != maps.end())
668 {
669 auto& [map_creation_version, map] = it->second;
670 // Rollback this map whether we're forgetting about it or not. Anyone
671 // else still holding it should see it has rolled back
672 map->rollback(tx_id.seqno);
673 if (map_creation_version > tx_id.seqno)
674 {
675 // Map was created more recently; its creation is being forgotten.
676 // Erase our knowledge of it
677 map->unlock();
678 it = maps.erase(it);
679 }
680 else
681 {
682 ++it;
683 }
684 }
685
686 for (auto& map_it : maps)
687 {
688 auto& [_, map] = map_it.second;
689 map->unlock();
690 }
691 }
692
693 void initialise_term(Term t) override
694 {
695 // Note: This should only be called once, when the store is first
696 // initialised. term_of_next_version is later updated via rollback.
697 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
698 if (term_of_next_version != 0)
699 {
700 throw std::logic_error("term_of_next_version is already initialised");
701 }
702
704 auto h = get_history();
705 if (h)
706 {
707 h->set_term(term_of_next_version);
708 }
709 }
710
712 const std::vector<uint8_t>& data,
713 bool public_only,
715 ccf::kv::Term& view,
716 ccf::kv::EntryFlags& entry_flags,
717 OrderedChanges& changes,
718 MapCollection& new_maps,
719 ccf::ClaimsDigest& claims_digest,
720 std::optional<ccf::crypto::Sha256Hash>& commit_evidence_digest,
721 bool ignore_strict_versions = false) override
722 {
723 // This will return FAILED if the serialised transaction is being
724 // applied out of order.
725 // Processing transactions locally and also deserialising to the
726 // same store will result in a store version mismatch and
727 // deserialisation will then fail.
728 auto e = get_encryptor();
729
730 auto d = KvStoreDeserialiser(
731 e,
732 public_only ? ccf::kv::SecurityDomain::PUBLIC :
733 std::optional<ccf::kv::SecurityDomain>());
734
735 auto v_ =
736 d.init(data.data(), data.size(), view, entry_flags, is_historical);
737 if (!v_.has_value())
738 {
739 LOG_FAIL_FMT("Initialisation of deserialise object failed");
740 return false;
741 }
742 v = v_.value();
743
744 claims_digest = std::move(d.consume_claims_digest());
746 "Deserialised claim digest {} {}",
747 claims_digest.value(),
748 claims_digest.empty());
749
750 commit_evidence_digest = std::move(d.consume_commit_evidence_digest());
751 if (commit_evidence_digest.has_value())
752 {
754 "Deserialised commit evidence digest {}",
755 commit_evidence_digest.value());
756 }
757
758 // Throw away any local commits that have not propagated via the
759 // consensus.
761
762 if (strict_versions && !ignore_strict_versions)
763 {
764 // Make sure this is the next transaction.
765 auto cv = current_version();
766 if (cv != (v - 1))
767 {
769 "Tried to deserialise {} but current_version is {}", v, cv);
770 return false;
771 }
772 }
773
774 // Deserialised transactions express read dependencies as versions,
775 // rather than with the actual value read. As a result, they don't
776 // need snapshot isolation on the map state, and so do not need to
777 // lock each of the maps before creating the transaction.
778 std::lock_guard<ccf::pal::Mutex> mguard(maps_lock);
779
780 for (auto r = d.start_map(); r.has_value(); r = d.start_map())
781 {
782 const auto map_name = r.value();
783
784 auto map = get_map_internal(v, map_name);
785 if (map == nullptr)
786 {
787 auto new_map = std::make_shared<ccf::kv::untyped::Map>(
788 this, map_name, get_security_domain(map_name));
789 map = new_map;
790 new_maps[map_name] = new_map;
792 "Creating map '{}' while deserialising transaction at version {}",
793 map_name,
794 v);
795 }
796
797 auto change_search = changes.find(map_name);
798 if (change_search != changes.end())
799 {
800 LOG_FAIL_FMT("Failed to deserialise transaction at version {}", v);
801 LOG_DEBUG_FMT("Multiple writes on map {}", map_name);
802 return false;
803 }
804
805 auto deserialised_changes = map->deserialise_changes(d, v);
806
807 // Take ownership of the produced change set, store it to be applied
808 // later
809 changes.emplace_hint(
810 change_search,
811 std::piecewise_construct,
812 std::forward_as_tuple(map_name),
813 std::forward_as_tuple(map, std::move(deserialised_changes)));
814 }
815
816 if (!d.end())
817 {
818 LOG_FAIL_FMT("Unexpected content in transaction at version {}", v);
819 return false;
820 }
821
822 return true;
823 }
824
825 std::unique_ptr<ccf::kv::AbstractExecutionWrapper> deserialize(
826 const std::vector<uint8_t>& data,
827 bool public_only = false,
828 const std::optional<TxID>& expected_txid = std::nullopt) override
829 {
830 auto exec = std::make_unique<CFTExecutionWrapper>(
831 this, get_history(), get_chunker(), data, public_only, expected_txid);
832 return exec;
833 }
834
835 bool operator==(const Store& that) const
836 {
837 // Only used for debugging, not thread safe.
838 if (version != that.version)
839 {
840 return false;
841 }
842
843 if (maps.size() != that.maps.size())
844 {
845 return false;
846 }
847
848 return std::ranges::all_of(maps, [&that](const auto& entry) {
849 const auto& [map_name, map_pair] = entry;
850 auto search = that.maps.find(map_name);
851
852 if (search == that.maps.end())
853 {
854 return false;
855 }
856
857 const auto& [this_v, this_map] = map_pair;
858 const auto& [that_v, that_map] = search->second;
859
860 if (this_v != that_v)
861 {
862 return false;
863 }
864
865 if (*this_map != *that_map)
866 {
867 return false;
868 }
869 return true;
870 });
871 }
872
874 {
875 return version;
876 }
877
879 {
880 // Must lock in case the version or read term is being incremented.
881 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
882 return current_txid_unsafe();
883 }
884
885 std::pair<TxID, Term> current_txid_and_commit_term() override
886 {
887 // Must lock in case the version or commit term is being incremented.
888 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
889 return {current_txid_unsafe(), term_of_next_version};
890 }
891
893 {
894 return compacted;
895 }
896
897 Term commit_view() override
898 {
899 // Must lock in case the commit_view is being incremented.
901 }
902
904 const TxID& txid,
905 std::unique_ptr<PendingTx> pending_tx,
906 bool globally_committable) override
907 {
908 auto c = get_consensus();
909 if (!c)
910 {
912 }
913
914 std::lock_guard<ccf::pal::Mutex> cguard(commit_lock);
915
917 "Store::commit {}{}",
918 txid.seqno,
919 (globally_committable ? " globally_committable" : ""));
920
921 BatchVector batch;
922 Version previous_last_replicated = 0;
923 Version next_last_replicated = 0;
924 Version previous_rollback_count = 0;
925 ccf::View replication_view = 0;
926
927 std::vector<std::tuple<std::unique_ptr<PendingTx>, bool>>
928 contiguous_pending_txs;
929 auto h = get_history();
930
931 {
932 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
933 if (txid.view != term_of_next_version && get_consensus()->is_primary())
934 {
935 // This can happen when a transaction started before a view change,
936 // but tries to commit after the view change is complete.
938 "Want to commit for term {} but term is {}",
939 txid.view,
941
943 }
944
945 if (globally_committable && txid.seqno > last_committable)
946 {
947 last_committable = txid.seqno;
948 }
949
950 pending_txs.insert(
951 {txid.seqno,
952 std::make_tuple(std::move(pending_tx), globally_committable)});
953
954 LOG_TRACE_FMT("Inserting pending tx at {}", txid.seqno);
955
956 for (Version offset = 1; true; ++offset)
957 {
958 auto search = pending_txs.find(last_replicated + offset);
959 if (search == pending_txs.end())
960 {
962 "Couldn't find {} = {} + {}, giving up on batch while committing "
963 "{}.{}",
964 last_replicated + offset,
966 offset,
967 txid.view,
968 txid.seqno);
969 break;
970 }
971
972 contiguous_pending_txs.emplace_back(std::move(search->second));
973 pending_txs.erase(search);
974 }
975
976 previous_rollback_count = rollback_count;
977 previous_last_replicated = last_replicated;
978 next_last_replicated = last_replicated + contiguous_pending_txs.size();
979
980 replication_view = term_of_next_version;
981 }
982 // Release version lock
983
984 if (contiguous_pending_txs.empty())
985 {
987 }
988
989 size_t offset = 1;
990 for (auto& [pending_tx_, committable_] : contiguous_pending_txs)
991 {
992 auto
993 [success_, data_, claims_digest_, commit_evidence_digest_, hooks_] =
994 pending_tx_->call();
995 auto data_shared =
996 std::make_shared<std::vector<uint8_t>>(std::move(data_));
997 auto hooks_shared =
998 std::make_shared<ccf::kv::ConsensusHookPtrs>(std::move(hooks_));
999
1000 // A pending tx may fail here if rollback invalidated a reserved
1001 // signature tx after it was dequeued from pending_txs.
1002 if (success_ == CommitResult::FAIL_NO_REPLICATE)
1003 {
1005 "Failed Tx commit {}", previous_last_replicated + offset);
1006 return success_;
1007 }
1008 // We should never fail from here, as normal txs have already succeeded
1009 // and reserved txs only fail with FAIL_NO_REPLICATE
1010 if (success_ != CommitResult::SUCCESS)
1011 {
1013 "Unexpected failure reason {} during commit of {}.{}",
1014 static_cast<int>(success_),
1015 txid.view,
1016 txid.seqno);
1017 }
1018
1019 if (h)
1020 {
1021 h->append_entry(
1022 ccf::entry_leaf(
1023 *data_shared, commit_evidence_digest_, claims_digest_),
1024 replication_view);
1025 }
1026
1027 if (chunker)
1028 {
1029 chunker->append_entry_size(data_shared->size());
1030 }
1031
1033 "Batching {} ({}) during commit of {}.{}",
1034 previous_last_replicated + offset,
1035 data_shared->size(),
1036 txid.view,
1037 txid.seqno);
1038
1039 batch.emplace_back(
1040 previous_last_replicated + offset,
1041 data_shared,
1042 committable_,
1043 hooks_shared);
1044
1045 offset++;
1046 }
1047
1048 if (c->replicate(batch, replication_view))
1049 {
1050 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1051 if (
1052 last_replicated == previous_last_replicated &&
1053 previous_rollback_count == rollback_count)
1054 {
1055 last_replicated = next_last_replicated;
1056 }
1057 return CommitResult::SUCCESS;
1058 }
1059
1060 LOG_DEBUG_FMT("Failed to replicate");
1062 }
1063
1065 {
1066 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1067 if (snapshotter)
1068 {
1069 return snapshotter->should_schedule_snapshot(last_committable);
1070 }
1071 return false;
1072 }
1073
1075 {
1076 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1078 }
1079
1081 {
1082 // Note that snapshotter->record_committable, and therefore this function,
1083 // assumes that `version` is a committable entry/signature.
1084
1085 bool r = false;
1086
1087 if (chunker)
1088 {
1089 r |= chunker->is_chunk_end_requested(version);
1090 }
1091
1092 if (snapshotter)
1093 {
1094 r |= snapshotter->record_committable(version);
1095 }
1096 else
1097 {
1098 // This branch is required to ensure that when there is no snapshotter,
1099 // that this still triggers chunk ends if the flag is set.
1101 }
1102
1103 return r;
1104 }
1105
1106 void lock_map_set() override
1107 {
1108 maps_lock.lock();
1109 }
1110
1111 void unlock_map_set() override
1112 {
1113 maps_lock.unlock();
1114 }
1115
1116 bool check_rollback_count(Version count) override
1117 {
1118 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1119 return rollback_count == count;
1120 }
1121
1122 std::tuple<Version, Version> next_version(bool commit_new_map) override
1123 {
1124 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1125 Version v = next_version_unsafe();
1126
1127 auto previous_last_new_map = last_new_map;
1128 if (commit_new_map)
1129 {
1130 last_new_map = v;
1131 }
1132
1133 return std::make_tuple(v, previous_last_new_map);
1134 }
1135
1137 {
1138 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1139 return next_version_unsafe();
1140 }
1141
1142 TxID next_txid() override
1143 {
1144 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1145 next_version_unsafe();
1146
1147 return {term_of_next_version, version};
1148 }
1149
1150 size_t committable_gap() override
1151 {
1152 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1153 return version - last_committable;
1154 }
1155
1169 {
1170 {
1171 const auto source_version = store.current_version();
1172 const auto target_version = current_version();
1173 if (source_version > target_version)
1174 {
1175 throw std::runtime_error(fmt::format(
1176 "Invalid call to swap_private_maps. Source is at version {} while "
1177 "target is at {}",
1178 source_version,
1179 target_version));
1180 }
1181 }
1182
1183 std::scoped_lock<ccf::pal::Mutex, ccf::pal::Mutex> guard_both_store_maps(
1184 maps_lock, store.maps_lock);
1185
1186 // Each entry is (Name, MyMap, TheirMap)
1187 using MapEntry = std::tuple<std::string, AbstractMap*, AbstractMap*>;
1188 std::vector<MapEntry> entries;
1189
1190 // Get the list of private maps from the source store
1191 for (auto& [name, pair] : store.maps)
1192 {
1193 auto& [_, map] = pair;
1194 if (map->get_security_domain() == SecurityDomain::PRIVATE)
1195 {
1196 map->lock();
1197 entries.emplace_back(name, nullptr, map.get());
1198 }
1199 }
1200
1201 // For each source map, either create it or, where it already exists,
1202 // confirm it is PRIVATE. Lock it and store it in entries
1203 auto entry = entries.begin();
1204 while (entry != entries.end())
1205 {
1206 const auto& [name, _, their_map] = *entry;
1207 std::shared_ptr<AbstractMap> map = nullptr;
1208 const auto it = maps.find(name);
1209 if (it == maps.end())
1210 {
1211 // NB: We lose the creation version from the original map, but assume
1212 // it is irrelevant - its creation should no longer be at risk of
1213 // rollback
1214 auto new_map = std::make_pair(
1215 NoVersion,
1216 std::make_shared<ccf::kv::untyped::Map>(
1217 this, name, SecurityDomain::PRIVATE));
1218 maps[name] = new_map;
1219 map = new_map.second;
1220 }
1221 else
1222 {
1223 map = it->second.second;
1224 if (map->get_security_domain() != SecurityDomain::PRIVATE)
1225 {
1226 throw std::logic_error(fmt::format(
1227 "Swap mismatch - map {} is private in source but not in target",
1228 name));
1229 }
1230 }
1231
1232 std::get<1>(*entry) = map.get();
1233 map->lock();
1234 ++entry;
1235 }
1236
1237 for (auto& [name, lhs, rhs] : entries)
1238 {
1239 lhs->swap(rhs);
1240 }
1241
1242 for (auto& [name, lhs, rhs] : entries)
1243 {
1244 lhs->unlock();
1245 rhs->unlock();
1246 }
1247 }
1248
1250 const std::string& map_name, const ccf::kv::untyped::Map::MapHook& hook)
1251 {
1252 map_hooks[map_name] = hook;
1253
1254 const auto it = maps.find(map_name);
1255 if (it != maps.end())
1256 {
1257 it->second.second->set_map_hook(hook);
1258 }
1259 }
1260
1261 void unset_map_hook(const std::string& map_name)
1262 {
1263 map_hooks.erase(map_name);
1264
1265 const auto it = maps.find(map_name);
1266 if (it != maps.end())
1267 {
1268 it->second.second->unset_map_hook();
1269 }
1270 }
1271
1273 const std::string& map_name,
1275 {
1276 global_hooks[map_name] = hook;
1277
1278 const auto it = maps.find(map_name);
1279 if (it != maps.end())
1280 {
1281 it->second.second->set_global_hook(hook);
1282 }
1283 }
1284
1285 void unset_global_hook(const std::string& map_name)
1286 {
1287 global_hooks.erase(map_name);
1288
1289 const auto it = maps.find(map_name);
1290 if (it != maps.end())
1291 {
1292 it->second.second->unset_global_hook();
1293 }
1294 }
1295
1297 {
1298 return {this};
1299 }
1300
1301 std::unique_ptr<ReadOnlyTx> create_read_only_tx_ptr() override
1302 {
1303 return std::make_unique<ReadOnlyTx>(this);
1304 }
1305
1307 {
1308 return {this};
1309 }
1310
1312 {
1313 return {this};
1314 }
1315
1316 std::unique_ptr<CommittableTx> create_tx_ptr()
1317 {
1318 return std::make_unique<CommittableTx>(this);
1319 }
1320
1322 {
1323 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1324 return {this, term_of_last_version, tx_id, rollback_count};
1325 }
1326
1327 void set_flag(StoreFlag f) override
1328 {
1329 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1330 set_flag_unsafe(f);
1331 }
1332
1333 void unset_flag(StoreFlag f) override
1334 {
1335 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1337 }
1338
1339 bool flag_enabled(StoreFlag f) override
1340 {
1341 std::lock_guard<ccf::pal::Mutex> vguard(version_lock);
1342 return flag_enabled_unsafe(f);
1343 }
1344
1346 {
1347 this->flags.fetch_or(static_cast<uint8_t>(f), std::memory_order_relaxed);
1348 }
1349
1351 {
1352 this->flags.fetch_and(
1353 ~static_cast<uint8_t>(f), std::memory_order_relaxed);
1354 }
1355
1356 [[nodiscard]] bool flag_enabled_unsafe(StoreFlag f) const override
1357 {
1358 return (flags.load(std::memory_order_relaxed) &
1359 static_cast<uint8_t>(f)) != 0;
1360 }
1361 };
1362
1363 using StorePtr = std::shared_ptr<ccf::kv::Store>;
1364}
Definition claims_digest.h:10
const Digest & value() const
Definition claims_digest.h:38
bool empty() const
Definition claims_digest.h:33
Definition kv_types.h:632
StoreFlag
Definition kv_types.h:701
Definition committable_tx.h:19
Definition deserialise.h:19
Definition read_only_store.h:13
Definition tx.h:159
Definition committable_tx.h:370
Definition store.h:25
Term term_of_last_version
Definition store.h:51
std::atomic< Version > compacted
Definition store.h:39
std::atomic< Version > version
Definition store.h:37
ccf::pal::Mutex maps_lock
Definition store.h:33
Version last_committable
Definition store.h:56
Version rollback_count
Definition store.h:58
Version last_new_map
Definition store.h:38
Maps maps
Definition store.h:34
std::atomic< Term > term_of_next_version
Definition store.h:45
void clear()
Definition store.h:64
ccf::pal::Mutex version_lock
Definition store.h:36
std::map< std::string, std::pair< ccf::kv::Version, std::shared_ptr< untyped::Map > > > Maps
Definition store.h:32
std::unordered_map< Version, std::tuple< std::unique_ptr< PendingTx >, bool > > pending_txs
Definition store.h:61
Version last_replicated
Definition store.h:53
ccf::pal::Mutex commit_lock
Definition store.h:42
Definition store.h:88
bool should_schedule_snapshot()
Definition store.h:1064
bool should_create_ledger_chunk_unsafe(Version version) override
Definition store.h:1080
ApplyResult deserialise_snapshot(const uint8_t *data, size_t size, ccf::kv::ConsensusHookPtrs &hooks, std::vector< Version > *view_history=nullptr, bool public_only=false) override
Definition store.h:393
ReservedTx create_reserved_tx(const TxID &tx_id)
Definition store.h:1321
ReadOnlyTx create_read_only_tx() override
Definition store.h:1296
void set_flag_unsafe(StoreFlag f) override
Definition store.h:1345
std::shared_ptr< AbstractMap > get_map_unsafe(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:251
size_t committable_gap() override
Definition store.h:1150
TxID next_txid() override
Definition store.h:1142
void unset_global_hook(const std::string &map_name)
Definition store.h:1285
void initialise_term(Term t) override
Definition store.h:693
Store(const Store &that)=delete
void unset_flag_unsafe(StoreFlag f) override
Definition store.h:1350
std::shared_ptr< TxHistory > get_history() override
Definition store.h:196
void set_map_hook(const std::string &map_name, const ccf::kv::untyped::Map::MapHook &hook)
Definition store.h:1249
void unlock_map_set() override
Definition store.h:1111
void set_snapshotter(const SnapshotterPtr &snapshotter_)
Definition store.h:226
void unset_map_hook(const std::string &map_name)
Definition store.h:1261
void add_dynamic_map(ccf::kv::Version v, const std::shared_ptr< AbstractMap > &map_) override
Definition store.h:283
Version current_version() override
Definition store.h:873
void swap_private_maps(Store &store)
Definition store.h:1168
std::shared_ptr< AbstractMap > get_map(ccf::kv::Version v, const std::string &map_name) override
Definition store.h:244
std::tuple< Version, Version > next_version(bool commit_new_map) override
Definition store.h:1122
bool flag_enabled(StoreFlag f) override
Definition store.h:1339
bool operator==(const Store &that) const
Definition store.h:835
std::shared_ptr< ILedgerChunker > get_chunker() override
Definition store.h:206
EncryptorPtr get_encryptor() override
Definition store.h:221
std::vector< uint8_t > serialise_snapshot(std::unique_ptr< AbstractSnapshot > snapshot) override
Definition store.h:386
bool check_rollback_count(Version count) override
Definition store.h:1116
std::unique_ptr< CommittableTx > create_tx_ptr()
Definition store.h:1316
void set_consensus(const std::shared_ptr< Consensus > &consensus_)
Definition store.h:191
void rollback(const TxID &tx_id, Term term_of_next_version_) override
Definition store.h:599
void unset_flag(StoreFlag f) override
Definition store.h:1333
std::unique_ptr< AbstractSnapshot > snapshot_unsafe_maps(Version v) override
Definition store.h:320
void lock_map_set() override
Definition store.h:1106
Term commit_view() override
Definition store.h:897
void unlock_maps() override
Definition store.h:376
std::unique_ptr< ReadOnlyTx > create_read_only_tx_ptr() override
Definition store.h:1301
void lock_maps() override
Definition store.h:366
void set_encryptor(const EncryptorPtr &encryptor_)
Definition store.h:216
CommittableTx create_tx()
Definition store.h:1311
bool should_create_ledger_chunk(Version version) override
Definition store.h:1074
Version next_version() override
Definition store.h:1136
Version compacted_version() override
Definition store.h:892
Store(bool strict_versions_=true, bool is_historical_=false)
Definition store.h:176
void set_global_hook(const std::string &map_name, const ccf::kv::untyped::Map::CommitHook &hook)
Definition store.h:1272
std::pair< TxID, Term > current_txid_and_commit_term() override
Definition store.h:885
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:183
void set_flag(StoreFlag f) override
Definition store.h:1327
std::unique_ptr< ccf::kv::AbstractExecutionWrapper > deserialize(const std::vector< uint8_t > &data, bool public_only=false, const std::optional< TxID > &expected_txid=std::nullopt) override
Definition store.h:825
void set_chunker(const std::shared_ptr< ILedgerChunker > &chunker_)
Definition store.h:211
void set_history(const std::shared_ptr< TxHistory > &history_)
Definition store.h:201
CommitResult commit(const TxID &txid, std::unique_ptr< PendingTx > pending_tx, bool globally_committable) override
Definition store.h:903
bool fill_maps(const std::vector< uint8_t > &data, bool public_only, ccf::kv::Version &v, ccf::kv::Term &view, ccf::kv::EntryFlags &entry_flags, OrderedChanges &changes, MapCollection &new_maps, ccf::ClaimsDigest &claims_digest, std::optional< ccf::crypto::Sha256Hash > &commit_evidence_digest, bool ignore_strict_versions=false) override
Definition store.h:711
ccf::TxID current_txid() override
Definition store.h:878
void compact(Version v) override
Definition store.h:538
std::shared_ptr< ccf::kv::untyped::Map > get_map_internal(ccf::kv::Version v, const std::string &map_name)
Definition store.h:257
TxDiff create_tx_diff() override
Definition store.h:1306
bool flag_enabled_unsafe(StoreFlag f) const override
Definition store.h:1356
Definition tx.h:130
MapHook< Write > MapHook
Definition map.h:42
CommitHook< Write > CommitHook
Definition map.h:41
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition app_interface.h:18
std::shared_ptr< AbstractTxEncryptor > EncryptorPtr
Definition kv_types.h:527
uint64_t Term
Definition kv_types.h:46
std::vector< std::tuple< Version, std::shared_ptr< std::vector< uint8_t > >, bool, std::shared_ptr< ConsensusHookPtrs > > > BatchVector
Definition kv_types.h:209
CommitResult
Definition kv_types.h:212
@ FAIL_NO_REPLICATE
Definition kv_types.h:215
@ SUCCESS
Definition kv_types.h:213
@ PRIVATE
Definition kv_types.h:221
@ PUBLIC
Definition kv_types.h:220
uint64_t Version
Definition version.h:10
std::shared_ptr< AbstractSnapshotter > SnapshotterPtr
Definition kv_types.h:539
GenericDeserialiseWrapper< RawReader > KvStoreDeserialiser
Definition serialiser_declare.h:21
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
EntryFlags
Definition serialised_entry_format.h:15
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:41
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1363
ApplyResult
Definition kv_types.h:305
@ FAIL
Definition kv_types.h:314
@ PASS
Definition kv_types.h:306
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
std::mutex Mutex
Definition locking.h:12
uint64_t View
Definition tx_id.h:23
Definition consensus_types.h:23
Definition map_serializers.h:11
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45