CCF
Loading...
Searching...
No Matches
commit_callback_subsystem.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "kv/kv_types.h"
8
9#include <map>
10#include <mutex>
11
12namespace ccf
13{
15 {
16 private:
17 using Callbacks = std::vector<std::pair<ccf::TxID, CommitCallback>>;
18 std::map<ccf::SeqNo, Callbacks> pending_callbacks;
19
20 std::optional<ccf::TxID> known_commit = std::nullopt;
21 aft::ViewHistory known_view_history;
22
23 std::mutex callbacks_mutex;
24
26
27 public:
29
31 {
32 consensus = c;
33 }
34
35 void add_callback(ccf::TxID tx_id, CommitCallback&& callback) override
36 {
37 std::optional<ccf::FinalTxStatus> immediate_status;
38
39 {
40 std::lock_guard<std::mutex> guard(callbacks_mutex);
41
42 if (known_commit.has_value())
43 {
44 const auto local_view = known_view_history.view_at(tx_id.seqno);
45 const auto status = ccf::evaluate_tx_status(
46 tx_id.view,
47 tx_id.seqno,
48 local_view,
49 known_commit->view,
50 known_commit->seqno);
51
52 if (status == TxStatus::Committed || status == TxStatus::Invalid)
53 {
54 immediate_status = static_cast<ccf::FinalTxStatus>(status);
55 }
56 }
57
58 if (!immediate_status.has_value())
59 {
60 pending_callbacks[tx_id.seqno].emplace_back(
61 std::make_pair(tx_id, std::move(callback)));
62 return;
63 }
64 }
65
66 // Terminal status determined from cached state - execute callback
67 // outside the lock
68 callback(tx_id, immediate_status.value());
69 }
70
72 ccf::TxID committed, const aft::ViewHistory& view_history)
73 {
74 if (consensus == nullptr)
75 {
76 throw std::logic_error(
77 "trigger_callbacks() called before set_consensus()");
78 }
79
80 // Collect callbacks to invoke, under the lock
81 using ReadyCallback =
82 std::tuple<ccf::TxID, ccf::FinalTxStatus, CommitCallback>;
83 std::vector<ReadyCallback> ready;
84
85 {
86 std::lock_guard<std::mutex> guard(callbacks_mutex);
87
88 known_commit = committed;
89 known_view_history = view_history;
90
91 auto it = pending_callbacks.begin();
92 while (it != pending_callbacks.end())
93 {
94 auto& [seqno, callbacks] = *it;
95 if (seqno > committed.seqno)
96 {
97 break;
98 }
99
100 for (auto& [tx_id, callback] : callbacks)
101 {
102 const auto local_view = view_history.view_at(tx_id.seqno);
103 const auto status = ccf::evaluate_tx_status(
104 tx_id.view,
105 tx_id.seqno,
106 local_view,
107 committed.view,
108 committed.seqno);
109
110 if (status != TxStatus::Committed && status != TxStatus::Invalid)
111 {
112 throw std::logic_error(fmt::format(
113 "Expected transaction {} evaluated against commit point {} to "
114 "return terminal TxStatus, instead returned {}",
115 tx_id.to_str(),
116 committed.to_str(),
117 nlohmann::json(status).dump()));
118 }
119
120 const auto final_status = static_cast<ccf::FinalTxStatus>(status);
121 ready.emplace_back(tx_id, final_status, std::move(callback));
122 }
123
124 it = pending_callbacks.erase(it);
125 }
126 }
127
128 // Execute callbacks outside the lock
129 for (auto& [tx_id, final_status, callback] : ready)
130 {
131 callback(tx_id, final_status);
132 }
133 }
134 };
135}
Definition state.h:19
ccf::View view_at(ccf::kv::Version idx) const
Definition state.h:58
Definition commit_callback_interface.h:16
Definition commit_callback_subsystem.h:15
void add_callback(ccf::TxID tx_id, CommitCallback &&callback) override
Definition commit_callback_subsystem.h:35
void set_consensus(ccf::kv::Consensus *c)
Definition commit_callback_subsystem.h:30
void trigger_callbacks(ccf::TxID committed, const aft::ViewHistory &view_history)
Definition commit_callback_subsystem.h:71
Definition kv_types.h:367
Definition app_interface.h:13
FinalTxStatus
Definition tx_status.h:36
@ ready
Definition tls_session.h:19
std::function< void(ccf::TxID, ccf::FinalTxStatus)> CommitCallback
Definition commit_callback_interface.h:13
Definition consensus_types.h:23
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
std::string to_str() const
Definition tx_id.h:48