CCF
Loading...
Searching...
No Matches
historical_queries.h
Go to the documentation of this file.
1// Copyright (c) Microsoft Corporation. All rights reserved.
2// Licensed under the Apache 2.0 License.
3#pragma once
4
6#include "ccf/pal/locking.h"
8#include "ds/ccf_assert.h"
9#include "kv/store.h"
10#include "node/cose_common.h"
11#include "node/encryptor.h"
12#include "node/history.h"
13#include "node/ledger_secrets.h"
17
18#include <list>
19#include <map>
20#include <memory>
21#include <set>
22
23#ifdef ENABLE_HISTORICAL_VERBOSE_LOGGING
24# define HISTORICAL_LOG(...) LOG_INFO_FMT(__VA_ARGS__)
25# include <ranges>
26#else
27# define HISTORICAL_LOG(...)
28#endif
29
30namespace ccf::historical
31{
32 enum class RequestNamespace : uint8_t
33 {
35 System,
36 };
37
38 using CompoundHandle = std::pair<RequestNamespace, RequestHandle>;
39};
40
41FMT_BEGIN_NAMESPACE
42template <>
44{
45 template <typename ParseContext>
46 constexpr auto parse(ParseContext& ctx)
47 {
48 return ctx.begin();
49 }
50
51 template <typename FormatContext>
52 auto format(
53 const ccf::historical::CompoundHandle& p, FormatContext& ctx) const
54 {
55 return format_to(
56 ctx.out(),
57 "[{}|{}]",
58 std::get<0>(p) == ccf::historical::RequestNamespace::Application ? "APP" :
59 "SYS",
60 std::get<1>(p));
61 }
62};
63FMT_END_NAMESPACE
64
65namespace ccf::historical
66{
67 static constexpr auto slow_fetch_threshold = std::chrono::milliseconds(1000);
68
69 static std::optional<ccf::PrimarySignature> get_signature(
70 const ccf::kv::StorePtr& sig_store)
71 {
72 auto tx = sig_store->create_read_only_tx();
73 auto* signatures = tx.ro<ccf::Signatures>(ccf::Tables::SIGNATURES);
74 return signatures->get();
75 }
76
77 static std::optional<ccf::CoseSignature> get_cose_signature(
78 const ccf::kv::StorePtr& sig_store)
79 {
80 auto tx = sig_store->create_read_only_tx();
81 auto* signatures = tx.ro<ccf::CoseSignatures>(ccf::Tables::COSE_SIGNATURES);
82 return signatures->get();
83 }
84
85 static std::optional<std::vector<uint8_t>> get_tree(
86 const ccf::kv::StorePtr& sig_store)
87 {
88 auto tx = sig_store->create_read_only_tx();
89 auto* tree =
90 tx.ro<ccf::SerialisedMerkleTree>(ccf::Tables::SERIALISED_MERKLE_TREE);
91 return tree->get();
92 }
93
95 {
96 protected:
98 std::shared_ptr<ccf::LedgerSecrets> source_ledger_secrets;
100
101 std::shared_ptr<ccf::LedgerSecrets> historical_ledger_secrets;
102 std::shared_ptr<ccf::NodeEncryptor> historical_encryptor;
103
104 // whether to keep all the writes so that we can build a diff later
106
107 enum class StoreStage : uint8_t
108 {
109 Fetching,
110 Trusted,
111 };
112
113 using LedgerEntry = std::vector<uint8_t>;
114
116 {
117 if (earliest_secret_.secret == nullptr)
118 {
119 // Haven't worked out earliest known secret yet - work it out now
120 if (historical_ledger_secrets->is_empty())
121 {
123 !source_ledger_secrets->is_empty(),
124 "Source ledger secrets are empty");
126 }
127 else
128 {
131 historical_ledger_secrets->get_latest(tx).first <
132 source_ledger_secrets->get_first().first,
133 "Historical ledger secrets are not older than main ledger secrets");
134
136 }
137 }
138 }
139
141 {
142 std::chrono::milliseconds time_until_fetch = {};
147 bool is_signature = false;
151
152 std::optional<std::string> get_commit_evidence()
153 {
155 {
156 if (store == nullptr)
157 {
158 throw std::logic_error("Store pointer not set");
159 }
160 auto e = store->get_encryptor();
161 return e->get_commit_evidence(
163 }
164
165 return std::nullopt;
166 }
167 };
168 using StoreDetailsPtr = std::shared_ptr<StoreDetails>;
169 using RequestedStores = std::map<ccf::SeqNo, StoreDetailsPtr>;
170
171 using WeakStoreDetailsPtr = std::weak_ptr<StoreDetails>;
172 using AllRequestedStores = std::map<ccf::SeqNo, WeakStoreDetailsPtr>;
173
175 {
178
179 VersionedSecret() = default;
180 // NB: Can't use VersionedLedgerSecret directly because the first element
181 // is const, so the whole thing is non-copyable
183 valid_from(vls.first),
184 secret(vls.second)
185 {}
186 };
187
190
191 struct Request
192 {
194
196 std::chrono::milliseconds time_to_expiry{};
197
198 bool include_receipts = false;
199
200 // Entries from outside the requested range (such as the next signature)
201 // may be needed to produce receipts. They are stored here, distinct from
202 // user-requested stores.
204
205 // Only set when recovering ledger secrets
206 std::optional<ccf::SeqNo> awaiting_ledger_secrets = std::nullopt;
207
208 Request(AllRequestedStores& all_stores_) : all_stores(all_stores_) {}
209
211 {
212 auto it = all_stores.find(seqno);
213 if (it != all_stores.end())
214 {
215 return it->second.lock();
216 }
217
218 return nullptr;
219 }
220
221 [[nodiscard]] ccf::SeqNo first_requested_seqno() const
222 {
223 if (!my_stores.empty())
224 {
225 return my_stores.begin()->first;
226 }
227
228 return {};
229 }
230
231 std::pair<std::vector<SeqNo>, std::vector<SeqNo>> adjust_ranges(
232 const SeqNoCollection& new_seqnos,
233 bool should_include_receipts,
234 SeqNo earliest_ledger_secret_seqno)
235 {
236 std::vector<SeqNo> removed{};
237 std::vector<SeqNo> added{};
238
239 // If a seqno is earlier than the earliest known ledger secret, we will
240 // store that it was requested with a nullptr in `my_stores`, but not
241 // add it to `all_stores` to begin fetching until a sufficiently early
242 // secret has been retrieved. To avoid awkwardly sharding requests (and
243 // delaying the secret-fetch with a large request for a later range), we
244 // extend that to say that if _any_ seqno is too early, then _all_
245 // subsequent seqnos will be pending. This bool tracks that behaviour.
246 bool any_too_early = false;
247
248 {
249 auto prev_it = my_stores.begin();
250 auto new_it = new_seqnos.begin();
251 while (new_it != new_seqnos.end())
252 {
253 if (prev_it != my_stores.end() && *new_it == prev_it->first)
254 {
255 // Asking for a seqno which was also requested previously - do
256 // nothing and advance to compare next entries
257 ++new_it;
258 ++prev_it;
259 }
260 else if (prev_it != my_stores.end() && *new_it > prev_it->first)
261 {
262 // No longer looking for a seqno which was previously requested.
263 if (
264 supporting_signatures.find(prev_it->first) ==
266 {
267 removed.push_back(prev_it->first);
268 prev_it = my_stores.erase(prev_it);
269 }
270 else
271 {
272 ++prev_it;
273 }
274 }
275 else
276 {
277 // *new_it < prev_it->first
278 // Asking for a seqno which was not previously being fetched =>
279 // check if another request was fetching it, else create new
280 // details to track it
281 if (*new_it < earliest_ledger_secret_seqno || any_too_early)
282 {
283 // If this is too early for known secrets, just record that it
284 // was requested but don't add it to all_stores yet
285 added.push_back(*new_it);
286 prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr);
287 any_too_early = true;
288 }
289 else
290 {
291 auto all_it = all_stores.find(*new_it);
292 auto details =
293 all_it == all_stores.end() ? nullptr : all_it->second.lock();
294 if (details == nullptr)
295 {
296 HISTORICAL_LOG("{} is newly requested", *new_it);
297 details = std::make_shared<StoreDetails>();
298 all_stores.insert_or_assign(all_it, *new_it, details);
299 }
300 added.push_back(*new_it);
301 prev_it = my_stores.insert_or_assign(prev_it, *new_it, details);
302 }
303 }
304 }
305
306 if (prev_it != my_stores.end())
307 {
308 // If we have a suffix of seqnos previously requested, now
309 // unrequested, purge them - but keep supporting signature entries
310 auto it = prev_it;
311 while (it != my_stores.end())
312 {
313 if (
314 supporting_signatures.find(it->first) ==
316 {
317 removed.push_back(it->first);
318 it = my_stores.erase(it);
319 }
320 else
321 {
322 ++it;
323 }
324 }
325 }
326 }
327
329 "Added seqnos: {}, removed seqnos: {}, supporting signatures: {}",
330 fmt::join(added, ","),
331 fmt::join(removed, ","),
332 fmt::join(std::views::keys(supporting_signatures), ","));
333
334 const bool any_diff = !removed.empty() || !added.empty();
335
336 if (!any_diff && (should_include_receipts == include_receipts))
337 {
338 HISTORICAL_LOG("Identical to previous request");
339 return {removed, added};
340 }
341
342 include_receipts = should_include_receipts;
343
345 "Clearing {} supporting signatures", supporting_signatures.size());
346 supporting_signatures.clear();
347 if (should_include_receipts)
348 {
349 // If requesting signatures, populate receipts for each entry that we
350 // already have. Normally this would be done when each entry was
351 // received, but in the case that we have the entries already and only
352 // request signatures now, we delay that work to now.
353
354 for (auto seqno : new_seqnos)
355 {
356 auto more_to_add = populate_receipts(seqno);
357 std::sort(added.begin(), added.end());
358 std::sort(more_to_add.begin(), more_to_add.end());
359
360 std::vector<SeqNo> together;
361 std::merge(
362 added.begin(),
363 added.end(),
364 more_to_add.begin(),
365 more_to_add.end(),
366 std::back_inserter(together));
367
368 if (more_to_add.size() + added.size() != together.size())
369 {
371 "Invariant violation in adjust_ranges: more_to_add({}) + "
372 "added({}) != together({})",
373 more_to_add.size(),
374 added.size(),
375 together.size());
376 assert(false);
377 }
378
379 std::swap(added, together);
380 }
381 }
382 return {removed, added};
383 }
384
385 std::vector<ccf::SeqNo> populate_receipts(ccf::SeqNo new_seqno)
386 {
388 "Looking at {}, and populating receipts from it", new_seqno);
389 auto new_details = get_store_details(new_seqno);
390 if (new_details != nullptr && new_details->store != nullptr)
391 {
392 if (new_details->is_signature)
393 {
394 HISTORICAL_LOG("{} is a signature", new_seqno);
395
396 fill_receipts_from_signature(new_details);
397 }
398 else
399 {
400 // This isn't a signature. To find the signature for this, we look
401 // through every subsequent transaction, until we find either a gap
402 // (a seqno that hasn't been fetched yet), or a signature. If it is
403 // a signature, and we've found a contiguous range of seqnos to it,
404 // then it must be a signature over this seqno. Else we find a gap
405 // first, and fetch it in case it is the signature. It's possible
406 // that we already have the later signature, and wastefully fill in
407 // the gaps, but this reduces the cases we have to consider so makes
408 // the code much simpler.
409
410 HISTORICAL_LOG("{} is not a signature", new_seqno);
411 supporting_signatures.erase(new_seqno);
412
413 if (new_details->receipt != nullptr)
414 {
416 "Already have a receipt for {}, so no need to populate more",
417 new_seqno);
418 return {};
419 }
420
421 std::vector<SeqNo> added;
422
423 auto next_seqno = new_seqno + 1;
424 while (true)
425 {
426 auto all_it = all_stores.find(next_seqno);
427 auto details =
428 all_it == all_stores.end() ? nullptr : all_it->second.lock();
429 if (details == nullptr)
430 {
432 "Looking for new supporting signature at {}", next_seqno);
433 details = std::make_shared<StoreDetails>();
434 auto my_it = my_stores.find(next_seqno);
435 if (my_it == my_stores.end())
436 {
438 "Tracking potential supporting signature for new seqno {} "
439 "at {}",
440 new_seqno,
441 next_seqno);
442 added.push_back(next_seqno);
443 my_stores.insert_or_assign(my_it, next_seqno, details);
444 }
445
446 all_stores.insert_or_assign(all_it, next_seqno, details);
447 }
448
449 if (details->store == nullptr)
450 {
451 // Whether we just started fetching or someone else was already
452 // looking for this, it's the first gap we've found so _may_ be
453 // our signature
455 "Assigning {} as potential signature for {}",
456 next_seqno,
457 new_seqno);
458 supporting_signatures[next_seqno] = details;
459 return added;
460 }
461
462 if (details->is_signature)
463 {
464 const auto filled_this =
465 fill_receipts_from_signature(details, new_seqno);
466
467 if (
468 !filled_this && my_stores.find(new_seqno) != my_stores.end())
469 {
470 throw std::logic_error(fmt::format(
471 "Unexpected: Found a signature at {}, and contiguous range "
472 "of transactions from {}, yet signature does not cover "
473 "this seqno!",
474 next_seqno,
475 new_seqno));
476 }
477
478 return added;
479 }
480
481 // This is a normal transaction, and its already fetched.
482 // Nothing to do, consider the next.
483 ++next_seqno;
484 }
485
486 return added;
487 }
488 }
489 return {};
490 }
491
492 private:
493 bool fill_receipts_from_signature(
494 const std::shared_ptr<StoreDetails>& sig_details,
495 std::optional<ccf::SeqNo> should_fill = std::nullopt)
496 {
497 // Iterate through earlier indices. If this signature covers them
498 // then create a receipt for them
499 const auto sig = get_signature(sig_details->store);
500 const auto cose_sig = get_cose_signature(sig_details->store);
501 if (!sig.has_value() && !cose_sig.has_value())
502 {
503 return false;
504 }
505 const auto serialised_tree = get_tree(sig_details->store);
506 if (!serialised_tree.has_value())
507 {
508 return false;
509 }
510 ccf::MerkleTreeHistory tree(serialised_tree.value());
511
512 // This is either pointing at the sig itself, or the closest larger
513 // seqno we're holding
514 auto sig_lower_bound_it =
515 my_stores.lower_bound(sig_details->transaction_id.seqno);
516
517 if (sig_lower_bound_it != my_stores.begin()) // Skip empty map edge case
518 {
519 // Construct reverse iterator to search backwards from here
520 auto search_rit = std::reverse_iterator(sig_lower_bound_it);
521 while (search_rit != my_stores.rend())
522 {
523 auto seqno = search_rit->first;
524 if (tree.in_range(seqno))
525 {
526 auto details = search_rit->second;
527 if (details != nullptr && details->store != nullptr)
528 {
529 auto proof = tree.get_proof(seqno);
530
531 if (sig.has_value())
532 {
533 details->transaction_id = {sig->view, seqno};
534 details->receipt = std::make_shared<TxReceiptImpl>(
535 sig->sig,
536 cose_sig,
537 proof.get_root(),
538 proof.get_path(),
539 sig->node,
540 sig->cert,
541 details->entry_digest,
542 details->get_commit_evidence(),
543 details->claims_digest);
544 }
545 else
546 {
547 auto cose_receipt =
548 ccf::cose::decode_ccf_receipt(cose_sig.value(), false);
549 auto parsed_txid =
550 ccf::TxID::from_str(cose_receipt.phdr.ccf.txid);
551 if (!parsed_txid.has_value())
552 {
553 throw std::logic_error(fmt::format(
554 "Cannot parse CCF TxID: {}", cose_receipt.phdr.ccf.txid));
555 }
556
557 details->transaction_id = {parsed_txid->view, seqno};
558 details->receipt = std::make_shared<TxReceiptImpl>(
559 std::nullopt,
560 cose_sig,
561 proof.get_root(),
562 proof.get_path(),
563 ccf::NodeId{},
564 std::nullopt,
565 details->entry_digest,
566 details->get_commit_evidence(),
567 details->claims_digest);
568 }
569
571 "Assigned a receipt for {} after given signature at {}",
572 seqno,
573 sig_details->transaction_id.to_str());
574
575 if (should_fill.has_value() && seqno == *should_fill)
576 {
577 should_fill.reset();
578 }
579 }
580
581 ++search_rit;
582 }
583 else
584 {
585 // Found a seqno which this signature doesn't cover. It can't
586 // cover anything else, so break here
587 break;
588 }
589 }
590 }
591
592 return !should_fill.has_value();
593 }
594 };
595
596 // Guard all access to internal state with this lock
598
599 // Track all things currently requested by external callers
600 std::map<CompoundHandle, Request> requests;
601
602 // A map containing (weak pointers to) _all_ of the stores for active
603 // requests, allowing distinct requests for the same seqnos to share the
604 // same underlying state (and benefit from faster lookup)
606
607 ExpiryDuration default_expiry_duration = std::chrono::seconds(1800);
608
609 // These two combine into an effective O(log(N)) lookup/add/remove by
610 // handle.
611 std::list<CompoundHandle> lru_requests;
612 std::map<CompoundHandle, std::list<CompoundHandle>::iterator> lru_lookup;
613
614 // To maintain the estimated size consumed by all requests. Gets updated
615 // when ledger entries are fetched, and when requests are dropped.
616 std::unordered_map<SeqNo, std::set<CompoundHandle>> store_to_requests;
617 std::unordered_map<ccf::SeqNo, size_t> raw_store_sizes;
618
619 CacheSize soft_store_cache_limit{std::numeric_limits<size_t>::max()};
621
623 {
624 auto it = store_to_requests.find(seq);
625
626 if (it == store_to_requests.end())
627 {
628 store_to_requests.insert({seq, {handle}});
629 auto size = raw_store_sizes.find(seq);
630 if (size != raw_store_sizes.end())
631 {
632 estimated_store_cache_size += size->second;
633 }
634 }
635 else
636 {
637 it->second.insert(handle);
638 }
639 }
640
642 {
643 for (const auto& [seq, _] : requests.at(handle).my_stores)
644 {
645 add_request_ref(seq, handle);
646 }
647 }
648
650 {
651 auto it = store_to_requests.find(seq);
652 assert(it != store_to_requests.end());
653
654 it->second.erase(handle);
655 if (it->second.empty())
656 {
657 store_to_requests.erase(it);
658 auto size = raw_store_sizes.find(seq);
659 if (size != raw_store_sizes.end())
660 {
661 estimated_store_cache_size -= size->second;
662 raw_store_sizes.erase(size);
663 }
664 }
665 }
666
668 {
669 for (const auto& [seq, _] : requests.at(handle).my_stores)
670 {
671 remove_request_ref(seq, handle);
672 }
673 }
674
676 {
677 auto it = lru_lookup.find(handle);
678 if (it != lru_lookup.end())
679 {
680 lru_requests.erase(it->second);
681 it->second = lru_requests.insert(lru_requests.begin(), handle);
682 }
683 else
684 {
685 lru_lookup[handle] = lru_requests.insert(lru_requests.begin(), handle);
686 add_request_refs(handle);
687 }
688 }
689
690 void lru_shrink_to_fit(size_t threshold)
691 {
692 while (estimated_store_cache_size > threshold)
693 {
694 if (lru_requests.empty())
695 {
697 "LRU shrink to {} requested but cache is already empty", threshold);
698 return;
699 }
700
701 const auto handle = lru_requests.back();
703 "Cache size shrinking (reached {} / {}). Dropping {}",
705 threshold,
706 handle);
707
708 remove_request_refs(handle);
709 lru_lookup.erase(handle);
710
711 requests.erase(handle);
712 lru_requests.pop_back();
713 }
714 }
715
717 {
718 auto it = lru_lookup.find(handle);
719 if (it != lru_lookup.end())
720 {
721 remove_request_refs(handle);
722 lru_requests.erase(it->second);
723 lru_lookup.erase(it);
724 }
725 }
726
727 void update_store_raw_size(SeqNo seq, size_t new_size)
728 {
729 auto& stored_size = raw_store_sizes[seq];
730 assert(!stored_size || stored_size == new_size);
731
732 estimated_store_cache_size -= stored_size;
733 estimated_store_cache_size += new_size;
734 stored_size = new_size;
735 }
736
738 {
739 fetch_entries_range(seqno, seqno);
740 }
741
743 {
744 LOG_TRACE_FMT("fetch_entries_range({}, {})", from, to);
745
747 ::consensus::ledger_get_range,
748 to_host,
749 static_cast<::consensus::Index>(from),
750 static_cast<::consensus::Index>(to),
752 }
753
754 std::optional<ccf::SeqNo> fetch_supporting_secret_if_needed(
755 ccf::SeqNo seqno)
756 {
757 auto [earliest_ledger_secret_seqno, earliest_ledger_secret] =
760 earliest_ledger_secret != nullptr,
761 "Can't fetch without knowing earliest");
762
763 const auto too_early = seqno < earliest_ledger_secret_seqno;
764
765 auto previous_secret_stored_version =
766 earliest_ledger_secret->previous_secret_stored_version;
767 const auto is_next_secret =
768 previous_secret_stored_version.value_or(0) == seqno;
769
770 if (too_early || is_next_secret)
771 {
772 // Still need more secrets, fetch the next
773 if (!previous_secret_stored_version.has_value())
774 {
775 throw std::logic_error(fmt::format(
776 "Earliest known ledger secret at {} has no earlier secret stored "
777 "version ({})",
778 earliest_ledger_secret_seqno,
779 seqno));
780 }
781
782 const auto seqno_to_fetch = previous_secret_stored_version.value();
784 "Requesting historical entry at {} but first known ledger "
785 "secret is applicable from {}",
786 seqno,
787 earliest_ledger_secret_seqno);
788
789 auto it = all_stores.find(seqno_to_fetch);
790 auto details = it == all_stores.end() ? nullptr : it->second.lock();
791 if (details == nullptr)
792 {
793 LOG_TRACE_FMT("Requesting older secret at {} now", seqno_to_fetch);
794 details = std::make_shared<StoreDetails>();
795 all_stores.insert_or_assign(it, seqno_to_fetch, details);
796 fetch_entry_at(seqno_to_fetch);
797 }
798
799 next_secret_fetch_handle = details;
800
801 if (too_early)
802 {
803 return seqno_to_fetch;
804 }
805 }
806
807 return std::nullopt;
808 }
809
811 const StoreDetailsPtr& details,
812 const ccf::kv::StorePtr& store,
813 const ccf::crypto::Sha256Hash& entry_digest,
814 ccf::SeqNo seqno,
815 bool is_signature,
816 ccf::ClaimsDigest&& claims_digest,
817 bool has_commit_evidence)
818 {
819 // Deserialisation includes a GCM integrity check, so all entries
820 // have been verified by the time we get here.
821 details->current_stage = StoreStage::Trusted;
822 details->has_commit_evidence = has_commit_evidence;
823
824 details->entry_digest = entry_digest;
825 if (!claims_digest.empty())
826 {
827 details->claims_digest = std::move(claims_digest);
828 }
829
831 details->store == nullptr,
832 "Cache already has store for seqno {}",
833 seqno);
834 details->store = store;
835
836 details->is_signature = is_signature;
837 if (is_signature)
838 {
839 // Construct a signature receipt.
840 // We do this whether it was requested or not, because we have all
841 // the state to do so already, and it's simpler than constructing
842 // the receipt _later_ for an already-fetched signature
843 // transaction.
844 const auto sig = get_signature(details->store);
845 const auto cose_sig = get_cose_signature(details->store);
846 if (sig.has_value())
847 {
848 details->transaction_id = {sig->view, sig->seqno};
849 details->receipt = std::make_shared<TxReceiptImpl>(
850 sig->sig, cose_sig, sig->root.h, nullptr, sig->node, sig->cert);
851 }
852 else if (cose_sig.has_value())
853 {
854 auto as_receipt =
855 ccf::cose::decode_ccf_receipt(cose_sig.value(), false);
856 const auto& txid = as_receipt.phdr.ccf.txid;
857 auto parsed_txid = ccf::TxID::from_str(txid);
858
859 if (!parsed_txid.has_value())
860 {
861 throw std::logic_error(
862 fmt::format("Cannot parse CCF TxID: {}", txid));
863 }
864 details->transaction_id = parsed_txid.value();
865 details->receipt = std::make_shared<TxReceiptImpl>(
866 std::nullopt,
867 cose_sig,
868 std::nullopt,
869 nullptr,
870 ccf::NodeId{},
871 std::nullopt);
872 }
873 else
874 {
875 throw std::logic_error(
876 fmt::format("Seqno {} is a signature of an unknown type", seqno));
877 }
878 }
879
880 auto request_it = requests.begin();
881 while (request_it != requests.end())
882 {
883 auto& [handle, request] = *request_it;
884
885 // If this request was still waiting for a ledger secret, and this is
886 // that secret
887 if (
888 request.awaiting_ledger_secrets.has_value() &&
889 request.awaiting_ledger_secrets.value() == seqno)
890 {
892 "{} is a ledger secret seqno this request was waiting for", seqno);
893
894 request.awaiting_ledger_secrets =
895 fetch_supporting_secret_if_needed(request.first_requested_seqno());
896 if (!request.awaiting_ledger_secrets.has_value())
897 {
898 // Newly have all required secrets - begin fetching the actual
899 // entries. Note this is adding them to `all_stores`, from where
900 // they'll be requested on the next tick.
901 auto my_stores_it = request.my_stores.begin();
902 while (my_stores_it != request.my_stores.end())
903 {
904 auto [store_seqno, _] = *my_stores_it;
905 auto it = all_stores.find(store_seqno);
906 auto store_details =
907 it == all_stores.end() ? nullptr : it->second.lock();
908
909 if (store_details == nullptr)
910 {
911 store_details = std::make_shared<StoreDetails>();
912 all_stores.insert_or_assign(it, store_seqno, store_details);
913 }
914
915 my_stores_it->second = store_details;
916 ++my_stores_it;
917 }
918 }
919
920 // In either case, done with this request, try the next
921 ++request_it;
922 continue;
923 }
924
925 if (request.include_receipts)
926 {
927 const bool seqno_in_this_request =
928 (request.my_stores.find(seqno) != request.my_stores.end() ||
929 request.supporting_signatures.find(seqno) !=
930 request.supporting_signatures.end());
931 if (seqno_in_this_request)
932 {
933 auto added = request.populate_receipts(seqno);
934 for (auto seq : added)
935 {
936 add_request_ref(seq, handle);
937 }
938 }
939 }
940
941 ++request_it;
942 }
943 }
944
946 const ccf::kv::StorePtr& store, LedgerSecretPtr encrypting_secret)
947 {
948 // Read encrypted secrets from store
949 auto tx = store->create_read_only_tx();
950 auto* encrypted_past_ledger_secret_handle =
952 ccf::Tables::ENCRYPTED_PAST_LEDGER_SECRET);
953 if (encrypted_past_ledger_secret_handle == nullptr)
954 {
955 return false;
956 }
957
958 auto encrypted_past_ledger_secret =
959 encrypted_past_ledger_secret_handle->get();
960 if (!encrypted_past_ledger_secret.has_value())
961 {
962 return false;
963 }
964
965 // Construct description and decrypted secret
966 auto previous_ledger_secret =
967 encrypted_past_ledger_secret->previous_ledger_secret;
968 if (!previous_ledger_secret.has_value())
969 {
970 // The only write to this table that should not contain a previous
971 // secret is the initial service open
973 encrypted_past_ledger_secret->next_version.has_value() &&
974 encrypted_past_ledger_secret->next_version.value() == 1,
975 "Write to ledger secrets table at {} should contain a next_version "
976 "of 1",
977 store->current_version());
978 return true;
979 }
980
981 if (previous_ledger_secret->version >= earliest_secret_.valid_from)
982 {
984 "Skipping redundant ledger secret with version of {} when the "
985 "earliest known secret is from {}",
986 previous_ledger_secret->version,
988 return true;
989 }
990
991 auto recovered_ledger_secret = std::make_shared<LedgerSecret>(
993 encrypting_secret, previous_ledger_secret->encrypted_data),
994 previous_ledger_secret->previous_secret_stored_version);
995
996 // Add recovered secret to historical secrets
997 historical_ledger_secrets->set_secret(
998 previous_ledger_secret->version, std::move(recovered_ledger_secret));
999
1000 // Update earliest_secret
1001 CCF_ASSERT(
1002 previous_ledger_secret->version < earliest_secret_.valid_from, "");
1004
1005 return true;
1006 }
1007
1009 ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
1010 {
1011 if (end_seqno < start_seqno)
1012 {
1013 throw std::logic_error(fmt::format(
1014 "Invalid range for historical query: end {} is before start {}",
1015 end_seqno,
1016 start_seqno));
1017 }
1018
1019 SeqNoCollection c(start_seqno, end_seqno - start_seqno);
1020 return c;
1021 }
1022
1023 std::vector<StatePtr> get_states_internal(
1024 const CompoundHandle& handle,
1025 const SeqNoCollection& seqnos,
1026 ExpiryDuration seconds_until_expiry,
1027 bool include_receipts)
1028 {
1029 if (seqnos.empty())
1030 {
1031 throw std::logic_error(
1032 "Invalid range for historical query: Cannot request empty range");
1033 }
1034
1035 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1036
1037 const auto ms_until_expiry =
1038 std::chrono::duration_cast<std::chrono::milliseconds>(
1039 seconds_until_expiry);
1040
1041 auto it = requests.find(handle);
1042 if (it == requests.end())
1043 {
1044 // This is a new handle - insert a newly created Request for it
1045 it = requests.emplace_hint(it, handle, Request(all_stores));
1046 HISTORICAL_LOG("First time I've seen handle {}", handle);
1047 }
1048
1049 lru_promote(handle);
1050
1051 Request& request = it->second;
1052
1054
1055 // Update this Request to represent the currently requested ranges
1057 "Adjusting handle {} to cover {} seqnos starting at {} "
1058 "(include_receipts={})",
1059 handle,
1060 seqnos.size(),
1061 *seqnos.begin(),
1062 include_receipts);
1063 auto [removed, added] = request.adjust_ranges(
1064 seqnos, include_receipts, earliest_secret_.valid_from);
1065
1066 for (auto seq : removed)
1067 {
1068 remove_request_ref(seq, handle);
1069 }
1070 for (auto seq : added)
1071 {
1072 add_request_ref(seq, handle);
1073 }
1074
1075 // If the earliest target entry cannot be deserialised with the earliest
1076 // known ledger secret, record the target seqno and begin fetching the
1077 // previous historical ledger secret.
1078 request.awaiting_ledger_secrets =
1080
1081 // Reset the expiry timer as this has just been requested
1082 request.time_to_expiry = ms_until_expiry;
1083
1084 std::vector<StatePtr> trusted_states;
1085
1086 for (auto seqno : seqnos)
1087 {
1088 auto target_details = request.get_store_details(seqno);
1089 if (
1090 target_details != nullptr &&
1091 target_details->current_stage == StoreStage::Trusted &&
1092 (!request.include_receipts || target_details->receipt != nullptr))
1093 {
1094 // Have this store, associated txid and receipt and trust it - add
1095 // it to return list
1096 StatePtr state = std::make_shared<State>(
1097 target_details->store,
1098 target_details->receipt,
1099 target_details->transaction_id);
1100 trusted_states.push_back(state);
1101 }
1102 else
1103 {
1104 // Still fetching this store or don't trust it yet, so range is
1105 // incomplete - return empty vector
1106 return {};
1107 }
1108 }
1109
1110 return trusted_states;
1111 }
1112
1113 // Used when we received an invalid entry, to drop any requests which were
1114 // asking for it
1116 {
1117 auto request_it = requests.begin();
1118 while (request_it != requests.end())
1119 {
1120 if (request_it->second.get_store_details(seqno) != nullptr)
1121 {
1122 lru_evict(request_it->first);
1123 request_it = requests.erase(request_it);
1124 }
1125 else
1126 {
1127 ++request_it;
1128 }
1129 }
1130 }
1131
1132 std::vector<ccf::kv::ReadOnlyStorePtr> states_to_stores(
1133 const std::vector<StatePtr>& states)
1134 {
1135 std::vector<ccf::kv::ReadOnlyStorePtr> stores;
1136 stores.reserve(states.size());
1137 for (const auto& state : states)
1138 {
1139 stores.push_back(state->store);
1140 }
1141 return stores;
1142 }
1143
1144 public:
1146 ccf::kv::Store& store,
1147 const std::shared_ptr<ccf::LedgerSecrets>& secrets,
1148 ringbuffer::WriterPtr host_writer) :
1149 source_store(store),
1150 source_ledger_secrets(secrets),
1151 to_host(std::move(host_writer)),
1155 {}
1156
1158 const CompoundHandle& handle,
1159 ccf::SeqNo seqno,
1160 ExpiryDuration seconds_until_expiry)
1161 {
1162 auto range = get_store_range(handle, seqno, seqno, seconds_until_expiry);
1163 if (range.empty())
1164 {
1165 return nullptr;
1166 }
1167
1168 return range[0];
1169 }
1170
1172 const CompoundHandle& handle, ccf::SeqNo seqno)
1173 {
1174 return get_store_at(handle, seqno, default_expiry_duration);
1175 }
1176
1178 const CompoundHandle& handle,
1179 ccf::SeqNo seqno,
1180 ExpiryDuration seconds_until_expiry)
1181 {
1182 auto range = get_state_range(handle, seqno, seqno, seconds_until_expiry);
1183 if (range.empty())
1184 {
1185 return nullptr;
1186 }
1187
1188 return range[0];
1189 }
1190
1192 {
1193 return get_state_at(handle, seqno, default_expiry_duration);
1194 }
1195
1196 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1197 const CompoundHandle& handle,
1198 ccf::SeqNo start_seqno,
1199 ccf::SeqNo end_seqno,
1200 ExpiryDuration seconds_until_expiry)
1201 {
1203 handle,
1204 collection_from_single_range(start_seqno, end_seqno),
1205 seconds_until_expiry,
1206 false));
1207 }
1208
1209 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1210 const CompoundHandle& handle,
1211 ccf::SeqNo start_seqno,
1212 ccf::SeqNo end_seqno)
1213 {
1214 return get_store_range(
1215 handle, start_seqno, end_seqno, default_expiry_duration);
1216 }
1217
1218 std::vector<StatePtr> get_state_range(
1219 const CompoundHandle& handle,
1220 ccf::SeqNo start_seqno,
1221 ccf::SeqNo end_seqno,
1222 ExpiryDuration seconds_until_expiry)
1223 {
1224 return get_states_internal(
1225 handle,
1226 collection_from_single_range(start_seqno, end_seqno),
1227 seconds_until_expiry,
1228 true);
1229 }
1230
1231 std::vector<StatePtr> get_state_range(
1232 const CompoundHandle& handle,
1233 ccf::SeqNo start_seqno,
1234 ccf::SeqNo end_seqno)
1235 {
1236 return get_state_range(
1237 handle, start_seqno, end_seqno, default_expiry_duration);
1238 }
1239
1240 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1241 const CompoundHandle& handle,
1242 const SeqNoCollection& seqnos,
1243 ExpiryDuration seconds_until_expiry)
1244 {
1245 return states_to_stores(
1246 get_states_internal(handle, seqnos, seconds_until_expiry, false));
1247 }
1248
1249 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1250 const CompoundHandle& handle, const SeqNoCollection& seqnos)
1251 {
1252 return get_stores_for(handle, seqnos, default_expiry_duration);
1253 }
1254
1255 std::vector<StatePtr> get_states_for(
1256 const CompoundHandle& handle,
1257 const SeqNoCollection& seqnos,
1258 ExpiryDuration seconds_until_expiry)
1259 {
1260 if (seqnos.empty())
1261 {
1262 throw std::runtime_error("Cannot request empty range");
1263 }
1264 return get_states_internal(handle, seqnos, seconds_until_expiry, true);
1265 }
1266
1267 std::vector<StatePtr> get_states_for(
1268 const CompoundHandle& handle, const SeqNoCollection& seqnos)
1269 {
1270 return get_states_for(handle, seqnos, default_expiry_duration);
1271 }
1272
1274 {
1275 default_expiry_duration = duration;
1276 }
1277
1279 {
1280 soft_store_cache_limit = cache_limit;
1281 }
1282
1284 {
1286 }
1287
1289 {
1290 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1291 lru_evict(handle);
1292 const auto erased_count = requests.erase(handle);
1293 return erased_count > 0;
1294 }
1295
1296 bool handle_ledger_entry(ccf::SeqNo seqno, const std::vector<uint8_t>& data)
1297 {
1298 return handle_ledger_entry(seqno, data.data(), data.size());
1299 }
1300
1301 bool handle_ledger_entry(ccf::SeqNo seqno, const uint8_t* data, size_t size)
1302 {
1303 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1304 const auto it = all_stores.find(seqno);
1305 auto details = it == all_stores.end() ? nullptr : it->second.lock();
1306 if (details == nullptr || details->current_stage != StoreStage::Fetching)
1307 {
1308 // Unexpected entry, we already have it or weren't asking for it -
1309 // ignore this resubmission
1310 return false;
1311 }
1312
1314 ccf::ClaimsDigest claims_digest;
1315 bool has_commit_evidence = false;
1316 auto store = deserialise_ledger_entry(
1317 seqno,
1318 data,
1319 size,
1320 deserialise_result,
1321 claims_digest,
1322 has_commit_evidence);
1323
1324 if (deserialise_result == ccf::kv::ApplyResult::FAIL)
1325 {
1326 return false;
1327 }
1328
1329 {
1330 // Confirm this entry is from a precursor of the current state, and not
1331 // a fork
1332 const auto tx_id = store->current_txid();
1333 if (tx_id.seqno != seqno)
1334 {
1336 "Corrupt ledger entry received - claims to be {} but is actually "
1337 "{}.{}",
1338 seqno,
1339 tx_id.view,
1340 tx_id.seqno);
1341 return false;
1342 }
1343
1345 if (consensus == nullptr)
1346 {
1347 LOG_FAIL_FMT("No consensus on source store");
1348 return false;
1349 }
1350
1351 const auto actual_view = consensus->get_view(seqno);
1352 if (actual_view != tx_id.view)
1353 {
1355 "Ledger entry comes from fork - contains {}.{} but this service "
1356 "expected {}.{}",
1357 tx_id.view,
1358 tx_id.seqno,
1359 actual_view,
1360 seqno);
1361 return false;
1362 }
1363 }
1364
1365 const auto is_signature =
1366 deserialise_result == ccf::kv::ApplyResult::PASS_SIGNATURE;
1367
1369
1370 auto [valid_from, secret] = earliest_secret_;
1371
1372 if (secret != nullptr)
1373 {
1374 const auto& prev_version = secret->previous_secret_stored_version;
1375 if (prev_version.has_value() && *prev_version == seqno)
1376 {
1378 "Handling past ledger secret. Current earliest is valid from {}, "
1379 "now "
1380 "processing secret stored at {}",
1381 valid_from,
1382 seqno);
1384 next_secret_fetch_handle = nullptr;
1385 }
1386 }
1387
1389 "Processing historical store at {} ({})",
1390 seqno,
1391 (size_t)deserialise_result);
1392 const auto entry_digest = ccf::crypto::Sha256Hash({data, size});
1394 details,
1395 store,
1396 entry_digest,
1397 seqno,
1398 is_signature,
1399 std::move(claims_digest),
1400 has_commit_evidence);
1401
1402 update_store_raw_size(seqno, size);
1403 return true;
1404 }
1405
1407 ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const LedgerEntry& data)
1408 {
1409 return handle_ledger_entries(
1410 from_seqno, to_seqno, data.data(), data.size());
1411 }
1412
1414 ccf::SeqNo from_seqno,
1415 ccf::SeqNo to_seqno,
1416 const uint8_t* data,
1417 size_t size)
1418 {
1419 LOG_TRACE_FMT("handle_ledger_entries({}, {})", from_seqno, to_seqno);
1420
1421 auto seqno = from_seqno;
1422 bool all_accepted = true;
1423 while (size > 0)
1424 {
1425 const auto header =
1426 serialized::peek<ccf::kv::SerialisedEntryHeader>(data, size);
1427 const auto whole_size =
1428 header.size + ccf::kv::serialised_entry_header_size;
1429 all_accepted &= handle_ledger_entry(seqno, data, whole_size);
1430 data += whole_size;
1431 size -= whole_size;
1432 ++seqno;
1433 }
1434
1435 if (seqno != to_seqno + 1)
1436 {
1438 "Claimed ledger entries: [{}, {}), actual [{}, {}]",
1439 from_seqno,
1440 to_seqno,
1441 from_seqno,
1442 seqno);
1443 }
1444
1445 return all_accepted;
1446 }
1447
1449 {
1450 handle_no_entry_range(seqno, seqno);
1451 }
1452
1454 {
1455 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1456
1457 LOG_TRACE_FMT("handle_no_entry_range({}, {})", from_seqno, to_seqno);
1458
1459 for (auto seqno = from_seqno; seqno <= to_seqno; ++seqno)
1460 {
1461 // The host failed or refused to give this entry. Currently just
1462 // forget about it and drop any requests which were looking for it -
1463 // don't have a mechanism for remembering this failure and reporting it
1464 // to users.
1465 const auto fetches_it = all_stores.find(seqno);
1466 if (fetches_it != all_stores.end())
1467 {
1469
1470 all_stores.erase(fetches_it);
1471 }
1472 }
1473 }
1474
1476 ccf::SeqNo seqno,
1477 const uint8_t* data,
1478 size_t size,
1479 ccf::kv::ApplyResult& result,
1480 ccf::ClaimsDigest& claims_digest,
1481 bool& has_commit_evidence)
1482 {
1483 // Create a new store and try to deserialise this entry into it
1484 ccf::kv::StorePtr store = std::make_shared<ccf::kv::Store>(
1485 false /* Do not start from very first seqno */,
1486 true /* Make use of historical secrets */);
1487
1488 // If this is older than the node's currently known ledger secrets, use
1489 // the historical encryptor (which should have older secrets)
1490 if (seqno < source_ledger_secrets->get_first().first)
1491 {
1492 store->set_encryptor(historical_encryptor);
1493 }
1494 else
1495 {
1496 store->set_encryptor(source_store.get_encryptor());
1497 }
1498
1499 try
1500 {
1501 // Encrypted ledger secrets are deserialised in public-only mode. Their
1502 // Merkle tree integrity is not verified: even if the recovered ledger
1503 // secret was bogus, the deserialisation of subsequent ledger entries
1504 // would fail.
1505 bool public_only = false;
1506 for (const auto& [_, request] : requests)
1507 {
1508 const auto& als = request.awaiting_ledger_secrets;
1509 if (als.has_value() && als.value() == seqno)
1510 {
1511 public_only = true;
1512 break;
1513 }
1514 }
1515
1516 auto exec = store->deserialize({data, data + size}, public_only);
1517 if (exec == nullptr)
1518 {
1520 return nullptr;
1521 }
1522
1523 result = exec->apply(track_deletes_on_missing_keys_v);
1524 claims_digest = std::move(exec->consume_claims_digest());
1525
1526 auto commit_evidence_digest =
1527 std::move(exec->consume_commit_evidence_digest());
1528 has_commit_evidence = commit_evidence_digest.has_value();
1529 }
1530 catch (const std::exception& e)
1531 {
1533 "Exception while attempting to deserialise entry {}: {}",
1534 seqno,
1535 e.what());
1537 }
1538
1539 return store;
1540 }
1541
1543 {
1544 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1546 }
1547
1548 void tick(const std::chrono::milliseconds& elapsed_ms)
1549 {
1550 std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
1551
1552 {
1553 auto it = requests.begin();
1554 while (it != requests.end())
1555 {
1556 auto& request = it->second;
1557 if (elapsed_ms >= request.time_to_expiry)
1558 {
1560 "Dropping expired historical query with handle {}", it->first);
1561 lru_evict(it->first);
1562 it = requests.erase(it);
1563 }
1564 else
1565 {
1566 request.time_to_expiry -= elapsed_ms;
1567 ++it;
1568 }
1569 }
1570 }
1571
1573
1574 {
1575 auto it = all_stores.begin();
1576 std::optional<std::pair<ccf::SeqNo, ccf::SeqNo>> range_to_request =
1577 std::nullopt;
1578 while (it != all_stores.end())
1579 {
1580 auto details = it->second.lock();
1581 if (details == nullptr)
1582 {
1583 it = all_stores.erase(it);
1584 }
1585 else
1586 {
1587 if (details->current_stage == StoreStage::Fetching)
1588 {
1589 details->time_until_fetch -= elapsed_ms;
1590 if (details->time_until_fetch.count() <= 0)
1591 {
1592 details->time_until_fetch = slow_fetch_threshold;
1593
1594 const auto seqno = it->first;
1595 if (auto range_val = range_to_request; range_val.has_value())
1596 {
1597 auto range = range_val.value();
1598 if (range.second + 1 == seqno)
1599 {
1600 range.second = seqno;
1601 range_to_request = range;
1602 }
1603 else
1604 {
1605 // Submit fetch for previously tracked range
1606 fetch_entries_range(range.first, range.second);
1607 // Track new range
1608 range_to_request = std::make_pair(seqno, seqno);
1609 }
1610 }
1611 else
1612 {
1613 // Track new range
1614 range_to_request = std::make_pair(seqno, seqno);
1615 }
1616 }
1617 }
1618
1619 ++it;
1620 }
1621 }
1622
1623 if (auto range_val = range_to_request; range_val.has_value())
1624 {
1625 // Submit fetch for final tracked range
1626 auto range = range_val.value();
1627 fetch_entries_range(range.first, range.second);
1628 }
1629 }
1630 }
1631 };
1632
1634 {
1635 protected:
1640
1641 public:
1642 template <typename... Ts>
1643 StateCache(Ts&&... ts) : StateCacheImpl(std::forward<Ts>(ts)...)
1644 {}
1645
1647 RequestHandle handle,
1648 ccf::SeqNo seqno,
1649 ExpiryDuration seconds_until_expiry) override
1650 {
1652 make_compound_handle(handle), seqno, seconds_until_expiry);
1653 }
1654
1656 RequestHandle handle, ccf::SeqNo seqno) override
1657 {
1659 }
1660
1662 RequestHandle handle,
1663 ccf::SeqNo seqno,
1664 ExpiryDuration seconds_until_expiry) override
1665 {
1667 make_compound_handle(handle), seqno, seconds_until_expiry);
1668 }
1669
1671 {
1673 }
1674
1675 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1676 RequestHandle handle,
1677 ccf::SeqNo start_seqno,
1678 ccf::SeqNo end_seqno,
1679 ExpiryDuration seconds_until_expiry) override
1680 {
1682 make_compound_handle(handle),
1683 start_seqno,
1684 end_seqno,
1685 seconds_until_expiry);
1686 }
1687
1688 std::vector<ccf::kv::ReadOnlyStorePtr> get_store_range(
1689 RequestHandle handle,
1690 ccf::SeqNo start_seqno,
1691 ccf::SeqNo end_seqno) override
1692 {
1694 make_compound_handle(handle), start_seqno, end_seqno);
1695 }
1696
1697 std::vector<StatePtr> get_state_range(
1698 RequestHandle handle,
1699 ccf::SeqNo start_seqno,
1700 ccf::SeqNo end_seqno,
1701 ExpiryDuration seconds_until_expiry) override
1702 {
1704 make_compound_handle(handle),
1705 start_seqno,
1706 end_seqno,
1707 seconds_until_expiry);
1708 }
1709
1710 std::vector<StatePtr> get_state_range(
1711 RequestHandle handle,
1712 ccf::SeqNo start_seqno,
1713 ccf::SeqNo end_seqno) override
1714 {
1716 make_compound_handle(handle), start_seqno, end_seqno);
1717 }
1718
1719 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1720 RequestHandle handle,
1721 const SeqNoCollection& seqnos,
1722 ExpiryDuration seconds_until_expiry) override
1723 {
1725 make_compound_handle(handle), seqnos, seconds_until_expiry);
1726 }
1727
1728 std::vector<ccf::kv::ReadOnlyStorePtr> get_stores_for(
1729 RequestHandle handle, const SeqNoCollection& seqnos) override
1730 {
1732 make_compound_handle(handle), seqnos);
1733 }
1734
1735 std::vector<StatePtr> get_states_for(
1736 RequestHandle handle,
1737 const SeqNoCollection& seqnos,
1738 ExpiryDuration seconds_until_expiry) override
1739 {
1741 make_compound_handle(handle), seqnos, seconds_until_expiry);
1742 }
1743
1744 std::vector<StatePtr> get_states_for(
1745 RequestHandle handle, const SeqNoCollection& seqnos) override
1746 {
1748 make_compound_handle(handle), seqnos);
1749 }
1750
1755
1756 void set_soft_cache_limit(CacheSize cache_limit) override
1757 {
1759 }
1760
1761 void track_deletes_on_missing_keys(bool track) override
1762 {
1764 }
1765
1766 bool drop_cached_states(RequestHandle handle) override
1767 {
1769 }
1770
1775 };
1776}
#define CCF_ASSERT_FMT(expr,...)
Definition ccf_assert.h:10
#define CCF_ASSERT(expr, msg)
Definition ccf_assert.h:14
Definition claims_digest.h:10
Definition ledger_secrets.h:25
Definition history.h:436
Definition sha256_hash.h:16
Definition contiguous_set.h:19
ConstIterator end() const
Definition contiguous_set.h:517
bool empty() const
Definition contiguous_set.h:344
ConstIterator begin() const
Definition contiguous_set.h:512
size_t size() const
Definition contiguous_set.h:336
Definition historical_queries_interface.h:67
Definition historical_queries.h:95
std::map< CompoundHandle, std::list< CompoundHandle >::iterator > lru_lookup
Definition historical_queries.h:612
void update_store_raw_size(SeqNo seq, size_t new_size)
Definition historical_queries.h:727
void process_deserialised_store(const StoreDetailsPtr &details, const ccf::kv::StorePtr &store, const ccf::crypto::Sha256Hash &entry_digest, ccf::SeqNo seqno, bool is_signature, ccf::ClaimsDigest &&claims_digest, bool has_commit_evidence)
Definition historical_queries.h:810
std::vector< StatePtr > get_states_for(const CompoundHandle &handle, const SeqNoCollection &seqnos)
Definition historical_queries.h:1267
std::map< CompoundHandle, Request > requests
Definition historical_queries.h:600
std::list< CompoundHandle > lru_requests
Definition historical_queries.h:611
std::vector< ccf::kv::ReadOnlyStorePtr > states_to_stores(const std::vector< StatePtr > &states)
Definition historical_queries.h:1132
ExpiryDuration default_expiry_duration
Definition historical_queries.h:607
bool track_deletes_on_missing_keys_v
Definition historical_queries.h:105
ccf::kv::Store & source_store
Definition historical_queries.h:97
std::map< ccf::SeqNo, WeakStoreDetailsPtr > AllRequestedStores
Definition historical_queries.h:172
ringbuffer::WriterPtr to_host
Definition historical_queries.h:99
void add_request_refs(CompoundHandle handle)
Definition historical_queries.h:641
std::vector< StatePtr > get_state_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1218
std::unordered_map< SeqNo, std::set< CompoundHandle > > store_to_requests
Definition historical_queries.h:616
std::vector< StatePtr > get_state_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:1231
StoreStage
Definition historical_queries.h:108
AllRequestedStores all_stores
Definition historical_queries.h:605
void set_soft_cache_limit(CacheSize cache_limit)
Definition historical_queries.h:1278
void remove_request_ref(SeqNo seq, CompoundHandle handle)
Definition historical_queries.h:649
CacheSize estimated_store_cache_size
Definition historical_queries.h:620
StatePtr get_state_at(const CompoundHandle &handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1177
StateCacheImpl(ccf::kv::Store &store, const std::shared_ptr< ccf::LedgerSecrets > &secrets, ringbuffer::WriterPtr host_writer)
Definition historical_queries.h:1145
bool drop_cached_states(const CompoundHandle &handle)
Definition historical_queries.h:1288
void handle_no_entry(ccf::SeqNo seqno)
Definition historical_queries.h:1448
void remove_request_refs(CompoundHandle handle)
Definition historical_queries.h:667
void track_deletes_on_missing_keys(bool track)
Definition historical_queries.h:1283
std::vector< StatePtr > get_states_for(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1255
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1240
std::map< ccf::SeqNo, StoreDetailsPtr > RequestedStores
Definition historical_queries.h:169
void handle_no_entry_range(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno)
Definition historical_queries.h:1453
bool handle_encrypted_past_ledger_secret(const ccf::kv::StorePtr &store, LedgerSecretPtr encrypting_secret)
Definition historical_queries.h:945
ccf::kv::ReadOnlyStorePtr get_store_at(const CompoundHandle &handle, ccf::SeqNo seqno)
Definition historical_queries.h:1171
StatePtr get_state_at(const CompoundHandle &handle, ccf::SeqNo seqno)
Definition historical_queries.h:1191
std::shared_ptr< ccf::NodeEncryptor > historical_encryptor
Definition historical_queries.h:102
CacheSize soft_store_cache_limit
Definition historical_queries.h:619
void add_request_ref(SeqNo seq, CompoundHandle handle)
Definition historical_queries.h:622
void fetch_entries_range(ccf::SeqNo from, ccf::SeqNo to)
Definition historical_queries.h:742
std::vector< uint8_t > LedgerEntry
Definition historical_queries.h:113
bool handle_ledger_entry(ccf::SeqNo seqno, const uint8_t *data, size_t size)
Definition historical_queries.h:1301
void fetch_entry_at(ccf::SeqNo seqno)
Definition historical_queries.h:737
ccf::kv::ReadOnlyStorePtr get_store_at(const CompoundHandle &handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1157
StoreDetailsPtr next_secret_fetch_handle
Definition historical_queries.h:189
void update_earliest_known_ledger_secret()
Definition historical_queries.h:115
ccf::pal::Mutex requests_lock
Definition historical_queries.h:597
std::shared_ptr< StoreDetails > StoreDetailsPtr
Definition historical_queries.h:168
bool handle_ledger_entry(ccf::SeqNo seqno, const std::vector< uint8_t > &data)
Definition historical_queries.h:1296
void tick(const std::chrono::milliseconds &elapsed_ms)
Definition historical_queries.h:1548
VersionedSecret earliest_secret_
Definition historical_queries.h:188
std::optional< ccf::SeqNo > fetch_supporting_secret_if_needed(ccf::SeqNo seqno)
Definition historical_queries.h:754
std::unordered_map< ccf::SeqNo, size_t > raw_store_sizes
Definition historical_queries.h:617
std::shared_ptr< ccf::LedgerSecrets > source_ledger_secrets
Definition historical_queries.h:98
ccf::kv::StorePtr deserialise_ledger_entry(ccf::SeqNo seqno, const uint8_t *data, size_t size, ccf::kv::ApplyResult &result, ccf::ClaimsDigest &claims_digest, bool &has_commit_evidence)
Definition historical_queries.h:1475
std::shared_ptr< ccf::LedgerSecrets > historical_ledger_secrets
Definition historical_queries.h:101
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(const CompoundHandle &handle, const SeqNoCollection &seqnos)
Definition historical_queries.h:1249
std::vector< StatePtr > get_states_internal(const CompoundHandle &handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry, bool include_receipts)
Definition historical_queries.h:1023
void delete_all_interested_requests(ccf::SeqNo seqno)
Definition historical_queries.h:1115
void lru_shrink_to_fit(size_t threshold)
Definition historical_queries.h:690
bool handle_ledger_entries(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const uint8_t *data, size_t size)
Definition historical_queries.h:1413
size_t get_estimated_store_cache_size()
Definition historical_queries.h:1542
void set_default_expiry_duration(ExpiryDuration duration)
Definition historical_queries.h:1273
bool handle_ledger_entries(ccf::SeqNo from_seqno, ccf::SeqNo to_seqno, const LedgerEntry &data)
Definition historical_queries.h:1406
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:1209
std::weak_ptr< StoreDetails > WeakStoreDetailsPtr
Definition historical_queries.h:171
SeqNoCollection collection_from_single_range(ccf::SeqNo start_seqno, ccf::SeqNo end_seqno)
Definition historical_queries.h:1008
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(const CompoundHandle &handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry)
Definition historical_queries.h:1196
void lru_promote(CompoundHandle handle)
Definition historical_queries.h:675
void lru_evict(CompoundHandle handle)
Definition historical_queries.h:716
Definition historical_queries.h:1634
StatePtr get_state_at(RequestHandle handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1661
std::vector< StatePtr > get_state_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1697
std::vector< StatePtr > get_states_for(RequestHandle handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1735
CompoundHandle make_compound_handle(RequestHandle rh)
Definition historical_queries.h:1636
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) override
Definition historical_queries.h:1688
ccf::kv::ReadOnlyStorePtr get_store_at(RequestHandle handle, ccf::SeqNo seqno) override
Definition historical_queries.h:1655
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(RequestHandle handle, const SeqNoCollection &seqnos) override
Definition historical_queries.h:1728
std::vector< ccf::kv::ReadOnlyStorePtr > get_stores_for(RequestHandle handle, const SeqNoCollection &seqnos, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1719
StatePtr get_state_at(RequestHandle handle, ccf::SeqNo seqno) override
Definition historical_queries.h:1670
void set_soft_cache_limit(CacheSize cache_limit) override
Definition historical_queries.h:1756
std::vector< StatePtr > get_state_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno) override
Definition historical_queries.h:1710
size_t get_estimated_store_cache_size() override
Definition historical_queries.h:1771
ccf::kv::ReadOnlyStorePtr get_store_at(RequestHandle handle, ccf::SeqNo seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1646
void set_default_expiry_duration(ExpiryDuration duration) override
Definition historical_queries.h:1751
bool drop_cached_states(RequestHandle handle) override
Definition historical_queries.h:1766
std::vector< StatePtr > get_states_for(RequestHandle handle, const SeqNoCollection &seqnos) override
Definition historical_queries.h:1744
void track_deletes_on_missing_keys(bool track) override
Definition historical_queries.h:1761
std::vector< ccf::kv::ReadOnlyStorePtr > get_store_range(RequestHandle handle, ccf::SeqNo start_seqno, ccf::SeqNo end_seqno, ExpiryDuration seconds_until_expiry) override
Definition historical_queries.h:1675
StateCache(Ts &&... ts)
Definition historical_queries.h:1643
Definition store.h:88
ReadOnlyTx create_read_only_tx() override
Definition store.h:1296
EncryptorPtr get_encryptor() override
Definition store.h:221
std::shared_ptr< Consensus > get_consensus() override
Definition store.h:183
Definition encryptor.h:15
Definition value.h:32
#define HISTORICAL_LOG(...)
Definition historical_queries.h:27
#define LOG_INFO_FMT
Definition internal_logger.h:15
#define LOG_TRACE_FMT
Definition internal_logger.h:13
#define LOG_DEBUG_FMT
Definition internal_logger.h:14
#define LOG_FAIL_FMT
Definition internal_logger.h:16
Definition historical_queries_adapter.h:17
std::chrono::seconds ExpiryDuration
Definition historical_queries_interface.h:50
std::shared_ptr< State > StatePtr
Definition historical_queries_interface.h:41
RequestNamespace
Definition historical_queries.h:33
std::pair< RequestNamespace, RequestHandle > CompoundHandle
Definition historical_queries.h:38
size_t RequestHandle
Definition historical_queries_interface.h:48
size_t CacheSize
Definition historical_queries_interface.h:52
std::shared_ptr< ReadOnlyStore > ReadOnlyStorePtr
Definition read_only_store.h:23
std::shared_ptr< ccf::kv::Store > StorePtr
Definition store.h:1363
ApplyResult
Definition kv_types.h:305
@ PASS_SIGNATURE
Definition kv_types.h:307
@ FAIL
Definition kv_types.h:314
std::mutex Mutex
Definition locking.h:12
Definition app_interface.h:13
LedgerSecretsMap::value_type VersionedLedgerSecret
Definition ledger_secrets.h:22
std::shared_ptr< TxReceiptImpl > TxReceiptImplPtr
Definition receipt.h:133
std::shared_ptr< LedgerSecret > LedgerSecretPtr
Definition ledger_secret.h:84
uint64_t SeqNo
Definition tx_id.h:36
std::vector< uint8_t > decrypt_previous_ledger_secret_raw(const LedgerSecretPtr &ledger_secret, const std::vector< uint8_t > &encrypted_previous_secret_raw)
Definition ledger_secret.h:92
Definition consensus_types.h:23
uint64_t Index
Definition ledger_enclave_types.h:11
@ HistoricalQuery
Definition ledger_enclave_types.h:16
std::shared_ptr< AbstractWriter > WriterPtr
Definition ring_buffer_types.h:154
STL namespace.
#define RINGBUFFER_WRITE_MESSAGE(MSG,...)
Definition ring_buffer_types.h:259
Definition tx_id.h:44
SeqNo seqno
Definition tx_id.h:46
View view
Definition tx_id.h:45
static std::optional< TxID > from_str(const std::string_view &sv)
Definition tx_id.h:53
Definition historical_queries.h:192
bool include_receipts
Definition historical_queries.h:198
std::vector< ccf::SeqNo > populate_receipts(ccf::SeqNo new_seqno)
Definition historical_queries.h:385
StoreDetailsPtr get_store_details(ccf::SeqNo seqno) const
Definition historical_queries.h:210
Request(AllRequestedStores &all_stores_)
Definition historical_queries.h:208
std::pair< std::vector< SeqNo >, std::vector< SeqNo > > adjust_ranges(const SeqNoCollection &new_seqnos, bool should_include_receipts, SeqNo earliest_ledger_secret_seqno)
Definition historical_queries.h:231
std::optional< ccf::SeqNo > awaiting_ledger_secrets
Definition historical_queries.h:206
AllRequestedStores & all_stores
Definition historical_queries.h:193
std::chrono::milliseconds time_to_expiry
Definition historical_queries.h:196
RequestedStores supporting_signatures
Definition historical_queries.h:203
ccf::SeqNo first_requested_seqno() const
Definition historical_queries.h:221
RequestedStores my_stores
Definition historical_queries.h:195
Definition historical_queries.h:141
std::chrono::milliseconds time_until_fetch
Definition historical_queries.h:142
std::optional< std::string > get_commit_evidence()
Definition historical_queries.h:152
bool has_commit_evidence
Definition historical_queries.h:150
ccf::kv::StorePtr store
Definition historical_queries.h:146
ccf::ClaimsDigest claims_digest
Definition historical_queries.h:145
ccf::crypto::Sha256Hash entry_digest
Definition historical_queries.h:144
ccf::TxID transaction_id
Definition historical_queries.h:149
TxReceiptImplPtr receipt
Definition historical_queries.h:148
StoreStage current_stage
Definition historical_queries.h:143
bool is_signature
Definition historical_queries.h:147
Definition historical_queries.h:175
ccf::LedgerSecretPtr secret
Definition historical_queries.h:177
ccf::SeqNo valid_from
Definition historical_queries.h:176
VersionedSecret(const ccf::VersionedLedgerSecret &vls)
Definition historical_queries.h:182
constexpr auto parse(ParseContext &ctx)
Definition historical_queries.h:46
auto format(const ccf::historical::CompoundHandle &p, FormatContext &ctx) const
Definition historical_queries.h:52