37 std::optional<ccf::FinalTxStatus> immediate_status;
40 std::lock_guard<std::mutex> guard(callbacks_mutex);
42 if (known_commit.has_value())
44 const auto local_view = known_view_history.
view_at(tx_id.
seqno);
45 const auto status = ccf::evaluate_tx_status(
58 if (!immediate_status.has_value())
60 pending_callbacks[tx_id.
seqno].emplace_back(
61 std::make_pair(tx_id, std::move(callback)));
68 callback(tx_id, immediate_status.value());
76 throw std::logic_error(
77 "trigger_callbacks() called before set_consensus()");
82 std::tuple<ccf::TxID, ccf::FinalTxStatus, CommitCallback>;
83 std::vector<ReadyCallback>
ready;
86 std::lock_guard<std::mutex> guard(callbacks_mutex);
88 known_commit = committed;
89 known_view_history = view_history;
91 auto it = pending_callbacks.begin();
92 while (it != pending_callbacks.end())
94 auto& [seqno, callbacks] = *it;
95 if (seqno > committed.
seqno)
100 for (
auto& [tx_id, callback] : callbacks)
102 const auto local_view = view_history.
view_at(tx_id.seqno);
103 const auto status = ccf::evaluate_tx_status(
112 throw std::logic_error(fmt::format(
113 "Expected transaction {} evaluated against commit point {} to "
114 "return terminal TxStatus, instead returned {}",
117 nlohmann::json(status).dump()));
121 ready.emplace_back(tx_id, final_status, std::move(callback));
124 it = pending_callbacks.erase(it);
129 for (
auto& [tx_id, final_status, callback] :
ready)
131 callback(tx_id, final_status);