CCF
Loading...
Searching...
No Matches
apply_changes.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
5#include "ccf/tx.h"
7#include "kv_types.h"
8
9#include <functional>
10#include <map>
11
12namespace ccf::kv
13{
14 // All collections of Map must be ordered so that we lock their contained
15 // maps in a stable order. The order here is by map name
16 using MapCollection = std::map<std::string, std::shared_ptr<AbstractMap>>;
17
19 {
20 virtual ~AbstractChangeContainer() = default;
21 virtual void set_change_list(OrderedChanges&& change_list, Term term) = 0;
22 };
23
24 // Atomically checks for conflicts then applies the writes in the given change
25 // sets to their underlying Maps. Calls f() at most once, iff the writes are
26 // applied, to retrieve a unique Version for the write set and return the max
27 // version which can have a conflict with the transaction.
28 //
29 // The track_read_versions parameter tells the store if it needs to track the
30 // last read version for every key. This is required for backup execution as
31 // described at the top of tx.h
32
34 using VersionResolver = std::function<std::tuple<Version, VersionLastNewMap>(
35 bool tx_contains_new_map)>;
36
37 static inline std::optional<Version> apply_changes(
38 OrderedChanges& changes,
39 VersionResolver version_resolver_fn,
41 const MapCollection& new_maps,
42 const std::optional<Version>& new_maps_conflict_version,
43 bool track_read_versions,
44 bool track_deletes_on_missing_keys,
45 const std::optional<Version>& expected_rollback_count = std::nullopt)
46 {
47 // All maps with pending writes are locked, transactions are prepared
48 // and possibly committed, and then all maps with pending writes are
49 // unlocked. This is to prevent transactions from being committed in an
50 // interleaved fashion.
51 Version version = NoVersion;
52 bool has_writes = false;
53
54 std::map<std::string, std::unique_ptr<AbstractCommitter>> views;
55 for (const auto& [map_name, mc] : changes)
56 {
57 views[map_name] = mc.map->create_committer(mc.changeset.get());
58 }
59
60 for (auto it = changes.begin(); it != changes.end(); ++it)
61 {
62 bool changeset_has_writes = it->second.changeset->has_writes();
63 if (changeset_has_writes)
64 {
65 has_writes = true;
66 }
67 if (changeset_has_writes || track_read_versions)
68 {
69 it->second.map->lock();
70 }
71 }
72
73 bool ok = true;
74
75 if (expected_rollback_count.has_value() && !changes.empty())
76 {
77 // expected_rollback_count is only set on signature transactions
78 // which always contain some writes, and on which all the maps
79 // point to the same store.
80 auto store = changes.begin()->second.map->get_store();
81 if (store != nullptr)
82 {
83 // Note that this is done when holding the lock on at least some maps
84 // through the combination of the changes not being empty, and the
85 // acquisition of the map locks on line 69. This guarantees atomicity
86 // with respect to rollbacks, which would acquire the map lock on all
87 // maps at once to truncate their roll. The net result is that the
88 // transaction becomes a noop if a rollback occurred between it being
89 // committed, and the side effects being applied.
90 ok = store->check_rollback_count(expected_rollback_count.value());
91 }
92 }
93
94 if (ok && has_writes)
95 {
96 for (auto it = views.begin(); it != views.end(); ++it)
97 {
98 if (!it->second->prepare(track_read_versions))
99 {
100 ok = false;
101 break;
102 }
103 }
104 }
105
106 for (const auto& [map_name, map_ptr] : new_maps)
107 {
108 // Check that none of these pending maps have already been created.
109 // It is possible for non-conflicting other transactions to commit here
110 // and increment the version, so we may ask this question at different
111 // versions. This is fine - none can create maps (ie - change their
112 // conflict set with this operation) while we hold the store lock. Assume
113 // that the caller is currently holding store->lock()
114 auto store = map_ptr->get_store();
115
116 // This is to avoid recursively locking version_lock by calling
117 // current_version() in the commit_reserved case.
118 ccf::kv::Version current_v;
119 if (new_maps_conflict_version.has_value())
120 {
121 current_v = *new_maps_conflict_version;
122 }
123 else
124 {
125 current_v = store->current_version();
126 }
127
128 if (store->get_map_unsafe(current_v, map_name) != nullptr)
129 {
130 ok = false;
131 break;
132 }
133 }
134
135 if (ok && has_writes)
136 {
137 // Get the version number to be used for this commit.
138 ccf::kv::Version version_last_new_map;
139 std::tie(version, version_last_new_map) =
140 version_resolver_fn(!new_maps.empty());
141
142 if (!track_read_versions)
143 {
144 // Transfer ownership of these new maps to their target stores, iff we
145 // have writes to them
146 for (const auto& [map_name, map_ptr] : new_maps)
147 {
148 const auto it = views.find(map_name);
149 if (it != views.end() && it->second->has_writes())
150 {
151 map_ptr->get_store()->add_dynamic_map(version, map_ptr);
152 }
153 }
154
155 for (auto it = views.begin(); it != views.end(); ++it)
156 {
157 it->second->commit(
158 version, track_read_versions, track_deletes_on_missing_keys);
159 }
160
161 // Collect ConsensusHooks
162 for (auto it = views.begin(); it != views.end(); ++it)
163 {
164 auto hook_ptr = it->second->post_commit();
165 if (hook_ptr != nullptr)
166 {
167 hooks.push_back(std::move(hook_ptr));
168 }
169 }
170 }
171 else
172 {
173 // A linearizability violation was detected
174 ok = false;
175 }
176 }
177
178 for (auto it = changes.begin(); it != changes.end(); ++it)
179 {
180 if (it->second.changeset->has_writes() || track_read_versions)
181 {
182 it->second.map->unlock();
183 }
184 }
185
186 if (!ok)
187 {
188 return std::nullopt;
189 }
190
191 return version;
192 }
193}
Definition app_interface.h:20
uint64_t Term
Definition kv_types.h:46
uint64_t Version
Definition version.h:8
std::function< std::tuple< Version, VersionLastNewMap >(bool tx_contains_new_map)> VersionResolver
Definition apply_changes.h:35
Version VersionLastNewMap
Definition apply_changes.h:33
std::map< std::string, std::shared_ptr< AbstractMap > > MapCollection
Definition apply_changes.h:16
std::map< std::string, MapChanges > OrderedChanges
Definition tx.h:42
std::vector< ConsensusHookPtr > ConsensusHookPtrs
Definition hooks.h:22
Definition apply_changes.h:19
virtual ~AbstractChangeContainer()=default
virtual void set_change_list(OrderedChanges &&change_list, Term term)=0