Source code for ccf.ledger

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache 2.0 License.

import struct
import os
from enum import Enum, IntEnum, Flag, auto

import json
from dataclasses import dataclass

from cryptography.x509 import load_pem_x509_certificate
from cryptography.hazmat.backends import default_backend

from ccf.merkletree import MerkleTree
from ccf.tx_id import TxID
import ccf.cose
import ccf.receipt

import warnings
from hashlib import sha256

from ccf import signatures

# Names that used to live in ``ccf.ledger`` and now live in ``ccf.signatures``.
# Accessing them through ``ccf.ledger`` emits a DeprecationWarning; import
# directly from ``ccf.signatures`` instead.
_DEPRECATED_SIGNATURE_REEXPORTS = frozenset(
    {
        "COSE_SIGNATURE_TX_TABLE_NAME",
        "InvalidRootCoseSignatureException",
        "InvalidRootException",
        "InvalidRootSignatureException",
        "SIGNATURE_TX_TABLE_NAME",
        "UntrustedNodeException",
        "WELL_KNOWN_SINGLETON_TABLE_KEY",
        "spki_from_cert",
    }
)


def __getattr__(name: str):
    if name in _DEPRECATED_SIGNATURE_REEXPORTS:
        warnings.warn(
            f"ccf.ledger.{name} is deprecated; "
            f"import {name} from ccf.signatures instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return getattr(signatures, name)
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


GCM_SIZE_TAG = 16
GCM_SIZE_IV = 12
LEDGER_DOMAIN_SIZE = 8
LEDGER_HEADER_SIZE = 8

# Public table names as defined in CCF.
TREE_TABLE_NAME = "public:ccf.internal.tree"
NODES_TABLE_NAME = "public:ccf.gov.nodes.info"
ENDORSED_NODE_CERTIFICATES_TABLE_NAME = "public:ccf.gov.nodes.endorsed_certificates"
SERVICE_INFO_TABLE_NAME = "public:ccf.gov.service.info"

COMMITTED_FILE_SUFFIX = ".committed"
RECOVERY_FILE_SUFFIX = ".recovery"
IGNORED_FILE_SUFFIX = ".ignored"

SHA256_DIGEST_SIZE = sha256().digest_size
ENCODED_COSE_SIGN1_TAG = 0xD2


class NodeStatus(Enum):
    PENDING = "Pending"
    TRUSTED = "Trusted"
    RETIRED = "Retired"


class EntryType(Enum):
    WRITE_SET = 0
    SNAPSHOT = 1
    WRITE_SET_WITH_CLAIMS = 2
    WRITE_SET_WITH_COMMIT_EVIDENCE = 3
    WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS = 4

    def has_claims(self):
        return self in (
            EntryType.WRITE_SET_WITH_CLAIMS,
            EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS,
        )

    def has_commit_evidence(self):
        return self in (
            EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE,
            EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS,
        )

    def is_deprecated(self):
        return self in (
            EntryType.WRITE_SET,
            EntryType.WRITE_SET_WITH_CLAIMS,
            EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE,
        )


class TransactionFlags(Flag):
    FORCE_CHUNK_AFTER = auto()
    FORCE_CHUNK_BEFORE = auto()


[docs] class VerificationLevel(IntEnum): """ Ledger verification levels, ordered from least to most verification. - NONE: No verification, just parse the ledger structure - OFFSETS: Validate offset table consistency - HEADERS: Validate transaction headers (size, version, flags) - MERKLE: Validate merkle tree consistency (trust first signature) - FULL: Full cryptographic verification including signatures """ NONE = 0 OFFSETS = 1 HEADERS = 2 MERKLE = 3 FULL = 4
def to_uint_64(buffer): return struct.unpack("@Q", buffer)[0] def is_ledger_chunk_committed(file_name): return file_name.endswith(COMMITTED_FILE_SUFFIX) def is_snapshot_file_committed(file_name): return file_name.endswith(COMMITTED_FILE_SUFFIX) def digest(data): return sha256(data).digest() def unpack(stream, fmt): size = struct.calcsize(fmt) buf = stream.read(size) if not buf: raise EOFError # Reached end of stream return struct.unpack(fmt, buf)[0] def unpack_array(buf, fmt): unpack_iter = struct.iter_unpack(fmt, buf) ret = [] while True: try: ret.append(next(unpack_iter)[0]) except StopIteration: break return ret def range_from_filename(filename: str) -> tuple[int, int | None]: elements = ( os.path.basename(filename) .replace(COMMITTED_FILE_SUFFIX, "") .replace(RECOVERY_FILE_SUFFIX, "") .replace("ledger_", "") .split("-") ) if len(elements) == 2: return (int(elements[0]), int(elements[1])) elif len(elements) == 1: return (int(elements[0]), None) else: raise ValueError(f"Could not read seqno range from ledger file {filename}") def get_range_from_file(ledger_file_path: str) -> tuple[int, int | None]: start, end = range_from_filename(ledger_file_path) if end is not None: return start, end # Open-ended ledger chunks may be observed while they are still being # created, for instance during shutdown. Treat any failure to infer an end # from the offset table as an incomplete chunk. try: with open(ledger_file_path, "rb") as ledger_file: header = ledger_file.read(LEDGER_HEADER_SIZE) if len(header) != LEDGER_HEADER_SIZE: raise ValueError( f"Invalid ledger chunk {ledger_file_path}: expected " f"{LEDGER_HEADER_SIZE} header bytes, found {len(header)}" ) table_offset = int.from_bytes(header, byteorder="little") if table_offset == 0: return start, None file_size = os.path.getsize(ledger_file_path) if table_offset > file_size: raise ValueError( f"Invalid ledger chunk {ledger_file_path}: positions table offset " f"{table_offset} is beyond file size {file_size}" ) positions_size = file_size - table_offset if positions_size % 4 != 0: raise ValueError( f"Invalid ledger chunk {ledger_file_path}: positions table has " f"unexpected size {positions_size}" ) positions_count = positions_size // 4 if positions_count == 0: return start, None return start, start + positions_count - 1 except (OSError, ValueError): return start, None def snapshot_index_from_filename(filename: str) -> tuple[int, int]: elements = ( os.path.basename(filename) .replace(COMMITTED_FILE_SUFFIX, "") .replace("snapshot_", "") .split("_") ) if len(elements) == 2: return (int(elements[0]), int(elements[1])) else: raise ValueError(f"Could not read snapshot index from file name {filename}") class GcmHeader: view: int seqno: int def __init__(self, buffer): if len(buffer) < GcmHeader.size(): raise ValueError("Corrupt GCM header") # _gcm_tag = buffer[:GCM_SIZE_TAG] # Unused _gcm_iv = buffer[GCM_SIZE_TAG : GCM_SIZE_TAG + GCM_SIZE_IV] self.seqno = struct.unpack("@Q", _gcm_iv[:8])[0] self.view = struct.unpack("@I", _gcm_iv[8:])[0] & 0x7FFFFFFF @staticmethod def size(): return GCM_SIZE_TAG + GCM_SIZE_IV
[docs] class PublicDomain: """ All public tables within a :py:class:`ccf.ledger.Transaction`. """ _buffer: bytes _cursor: int _entry_type: EntryType _claims_digest: bytes _version: int _max_conflict_version: int _tables: dict def __init__(self, buffer: bytes): self._entry_type = EntryType(buffer[0]) # Already read a 1-byte entry-type, so start from 1 not 0 self._cursor = 1 self._buffer = buffer self._version = self._read_int64() if self._entry_type.has_claims(): self._claims_digest = self._read_buffer(SHA256_DIGEST_SIZE) if self._entry_type.has_commit_evidence(): self._commit_evidence_digest = self._read_buffer(SHA256_DIGEST_SIZE) self._max_conflict_version = self._read_int64() if self._entry_type == EntryType.SNAPSHOT: self._read_snapshot_header() self._tables = {} self._read() def _read_buffer(self, size): prev_cursor = self._cursor self._cursor += size return self._buffer[prev_cursor : self._cursor] def _read8(self): return self._read_buffer(8) def _read_int64(self): return struct.unpack("<q", self._read8())[0] def _read_uint64(self): return struct.unpack("<Q", self._read8())[0] def is_deprecated(self): return self._entry_type.is_deprecated() def get_version_size(self): return 8 def _read_versioned_value(self, size): if size < self.get_version_size(): raise ValueError(f"Invalid versioned value of size {size}") return (self._read_uint64(), self._read_buffer(size - self.get_version_size())) def _read_next_entry(self): size = self._read_uint64() return self._read_buffer(size) def _read_string(self): return self._read_next_entry().decode() def _read_snapshot_header(self): # read hash of entry at snapshot hash_size = self._read_uint64() buffer = self._read_buffer(hash_size) self._hash_at_snapshot = buffer.hex() # read view history view_history_size = self._read_uint64() self._view_history = unpack_array(self._read_buffer(view_history_size), "<Q") def _read_snapshot_entry_padding(self, size): padding = -size % 8 # Padded to 8 bytes self._cursor += padding def _read_snapshot_key(self): size = self._read_uint64() key = self._read_buffer(size) self._read_snapshot_entry_padding(size) return key def _read_snapshot_versioned_value(self): size = self._read_uint64() ver, value = self._read_versioned_value(size) if ver < 0: assert ( len(value) == 0 ), f"Expected empty value for tombstone deletion at {ver}" value = None self._read_snapshot_entry_padding(size) return value def _read(self): buffer_size = len(self._buffer) while self._cursor < buffer_size: map_name = self._read_string() records = {} self._tables[map_name] = records if self._entry_type == EntryType.SNAPSHOT: # map snapshot version self._read8() # size of map entry map_size = self._read_uint64() start_map_pos = self._cursor while self._cursor - start_map_pos < map_size: k = self._read_snapshot_key() val = self._read_snapshot_versioned_value() records[k] = val else: # read_version self._read8() # read_count # Note: Read keys are not currently included in ledger transactions read_count = self._read_uint64() assert read_count == 0, f"Unexpected read count: {read_count}" write_count = self._read_uint64() if write_count: for _ in range(write_count): k = self._read_next_entry() val = self._read_next_entry() records[k] = val remove_count = self._read_uint64() if remove_count: for _ in range(remove_count): k = self._read_next_entry() records[k] = None
[docs] def get_tables(self) -> dict: """ Return a dictionary of all public tables (with their content) in a :py:class:`ccf.ledger.Transaction`. :return: Dictionary of public tables with their content. """ return self._tables
[docs] def get_seqno(self) -> int: """ Return the sequence number at which the transaction was recorded in the ledger. """ return self._version
[docs] def get_claims_digest(self) -> bytes | None: """ Return the claims digest when there is one """ return self._claims_digest if self._entry_type.has_claims() else None
[docs] def get_commit_evidence_digest(self) -> bytes | None: """ Return the commit evidence digest when there is one """ return ( self._commit_evidence_digest if self._entry_type.has_commit_evidence() else None )
class SimpleBuffer: def __init__(self, name: str, buffer: bytes, at_loc: int = 0): self.name = name self._buffer = buffer self._loc = at_loc self._len = len(self._buffer) def _safe_loc(self, loc): return min(loc, self._len) def tell(self): return self._loc def read(self, size: int | None = None): start = self._loc end = self._len if size is not None: end = self._safe_loc(start + size) self._loc = end return self._buffer[start:end] def seek(self, loc): self._loc = self._safe_loc(loc) return self._loc def clone(self, at_loc: int = 0): sb = SimpleBuffer(self.name, self._buffer, at_loc) return sb @staticmethod def from_file(filename): with open(filename, "rb") as f: return SimpleBuffer(filename, f.read()) def _byte_read_safe(file: SimpleBuffer, num_of_bytes): offset = file.tell() ret = file.read(num_of_bytes) if len(ret) != num_of_bytes: raise ValueError( f"Failed to read precise number of bytes at offset {offset}: {len(ret)}/{num_of_bytes}" ) return ret def _peek(file: SimpleBuffer, num_bytes, pos=None): save_pos = file.tell() if pos is not None: file.seek(pos) buffer = _byte_read_safe(file, num_bytes) file.seek(save_pos) return buffer def _peek_all(file: SimpleBuffer, pos=None): save_pos = file.tell() if pos is not None: file.seek(pos) buffer = file.read() file.seek(save_pos) return buffer class LedgerValidator: """ Ledger Validator contains the logic to verify that the ledger hasn't been tampered with. It has the ability to take transactions and it maintains a MerkleTree data structure similar to CCF. Ledger is valid and hasn't been tampered with if following conditions are met: 1) The merkle proof is signed by a Trusted node in the given network 2) The merkle root and signature are verified with the node cert 3) The merkle proof is correct for each set of transactions """ accept_deprecated_entry_types: bool = True signature_count: int = 0 transaction_count: int = 0 verification_level: VerificationLevel first_signature_seen: bool = False def __init__( self, accept_deprecated_entry_types: bool = True, verification_level: VerificationLevel = VerificationLevel.FULL, ): self.node_certificates: dict[str, str] = {} self.node_activity_status: dict[str, tuple[str, int, bool]] = {} self.accept_deprecated_entry_types = accept_deprecated_entry_types self.verification_level = verification_level # Start with empty bytes array. CCF MerkleTree uses an empty array as the first leaf of its merkle tree. # Don't hash empty bytes array. self.merkle = MerkleTree() empty_bytes_array = bytearray(SHA256_DIGEST_SIZE) self.merkle.add_leaf(bytes(empty_bytes_array), do_hash=False) self.last_verified_seqno = 0 self.last_verified_view = 0 self.service_status = None self.service_cert = None def last_verified_txid(self) -> TxID: return TxID(self.last_verified_view, self.last_verified_seqno) @staticmethod def validate_offsets(positions: list[int], file_size: int, file: "SimpleBuffer"): """ Validate that offset table entries point to valid transaction boundaries. Raises ValueError if offsets are invalid. """ if not positions: return # Empty positions list is valid for empty chunks # Check positions are sorted and within file bounds for i, pos in enumerate(positions): if pos < LEDGER_HEADER_SIZE: raise ValueError( f"Invalid offset at index {i}: {pos} is before end of header" ) if pos >= file_size: raise ValueError( f"Invalid offset at index {i}: {pos} exceeds file size {file_size}" ) if i > 0 and pos <= positions[i - 1]: raise ValueError( f"Invalid offset at index {i}: {pos} is not greater than previous offset {positions[i - 1]}" ) # Validate each offset points to a valid transaction header for i, pos in enumerate(positions): try: file.seek(pos) buffer = _byte_read_safe(file, TransactionHeader.get_size()) header = TransactionHeader(buffer) # Check if this transaction would extend beyond file bounds tx_end = pos + TransactionHeader.get_size() + header.size if tx_end > file_size: raise ValueError( f"Transaction at offset {pos} (index {i}) extends beyond file size: " f"ends at {tx_end} but file is {file_size} bytes" ) # Check if next position (if exists) aligns with end of this transaction if i + 1 < len(positions): expected_next_pos = tx_end actual_next_pos = positions[i + 1] if actual_next_pos != expected_next_pos: raise ValueError( f"Offset mismatch: transaction at {pos} ends at {expected_next_pos} " f"but next offset is {actual_next_pos}" ) except Exception as e: raise ValueError( f"Failed to validate transaction at offset {pos} (index {i}): {e}" ) from e @staticmethod def validate_transaction_header(header: "TransactionHeader"): """ Validate transaction header has valid version and flags. Raises ValueError if header is invalid. """ # Check version is a known EntryType try: _ = EntryType(header.version) except ValueError: raise ValueError( f"Invalid transaction version: {header.version}. " f"Valid versions are: {[e.value for e in EntryType]}" ) # Check flags are valid (only known flags bits should be set) valid_flags_mask = 0 for flag in TransactionFlags: valid_flags_mask |= flag.value if header.flags & ~valid_flags_mask: raise ValueError( f"Invalid transaction flags: {header.flags:#x}. " f"Unknown flag bits set." ) # Check size is reasonable (not zero, not too large) if header.size == 0: raise ValueError("Invalid transaction header: size is 0") # Max size check - 1GB seems like a reasonable maximum MAX_TX_SIZE = 1024 * 1024 * 1024 if header.size > MAX_TX_SIZE: raise ValueError( f"Invalid transaction header: size {header.size} exceeds maximum {MAX_TX_SIZE}" ) def add_transaction(self, transaction): """ To validate the ledger, ledger transactions need to be added via this method. Depending on the tables that were part of the transaction, it does different things. When transaction contains signature table, it starts the verification process and verifies that the root of merkle tree was signed by a node which was part of the network. It also matches the root of the merkle tree that this class maintains with the one extracted from the ledger. Further, it validates all service status transitions. If any of the above checks fail, this method throws. """ self.transaction_count += 1 # Validate transaction header for HEADERS level and above if self.verification_level >= VerificationLevel.HEADERS: self.validate_transaction_header(transaction.get_transaction_header()) transaction_public_domain = transaction.get_public_domain() if not self.accept_deprecated_entry_types: assert not transaction_public_domain.is_deprecated() tables = transaction_public_domain.get_tables() # Add contributing nodes certs and update nodes network trust status for verification # Only needed for FULL verification node_certs = {} if ( self.verification_level >= VerificationLevel.FULL and NODES_TABLE_NAME in tables ): node_table = tables[NODES_TABLE_NAME] for node_id, node_info in node_table.items(): node_id = node_id.decode() if node_info is None: # Node has been removed from the store self.node_activity_status.pop(node_id) continue node_info = json.loads(node_info) # Add the self-signed node certificate (only available in 1.x, # refer to node endorsed certificates table otherwise) if "cert" in node_info: node_certs[node_id] = node_info["cert"].encode() self.node_certificates[node_id] = node_certs[node_id] # Update node trust status # Also record the seqno at which the node status changed to # track when a primary node should stop issuing signatures self.node_activity_status[node_id] = ( node_info["status"], transaction_public_domain.get_seqno(), node_info.get("retired_committed", False), ) if ( self.verification_level >= VerificationLevel.FULL and ENDORSED_NODE_CERTIFICATES_TABLE_NAME in tables ): node_endorsed_certificates_tables = tables[ ENDORSED_NODE_CERTIFICATES_TABLE_NAME ] for ( node_id, endorsed_node_cert, ) in node_endorsed_certificates_tables.items(): node_id = node_id.decode() assert ( node_id not in node_certs ), f"Only one of node self-signed certificate and endorsed certificate should be recorded for node {node_id}" if endorsed_node_cert is None: # Node has been removed from the store self.node_certificates.pop(node_id) else: self.node_certificates[node_id] = endorsed_node_cert if ( self.verification_level >= VerificationLevel.FULL and SERVICE_INFO_TABLE_NAME in tables ): service_table = tables[SERVICE_INFO_TABLE_NAME] updated_service = service_table.get( signatures.WELL_KNOWN_SINGLETON_TABLE_KEY ) updated_service_json = json.loads(updated_service) updated_status = updated_service_json["status"] if updated_status == "Opening": # DR can happen at any point, so a transition to "Opening" is always valid pass elif self.service_status == updated_status: pass elif self.service_status == "Opening": assert updated_status in [ "Open", "WaitingForRecoveryShares", ], updated_status elif self.service_status == "Recovering": assert updated_status in ["WaitingForRecoveryShares"], updated_status elif self.service_status == "WaitingForRecoveryShares": assert updated_status in ["Open"], updated_status elif self.service_status == "Open": assert updated_status in ["Recovering"], updated_status else: assert self.service_status is None, self.service_status self.service_status = updated_status self.service_cert = updated_service_json["cert"] is_signature_tx = signatures.is_signature_transaction(tables) if is_signature_tx: self.signature_count += 1 if is_signature_tx and self.verification_level >= VerificationLevel.MERKLE: if self.verification_level >= VerificationLevel.FULL: verified_count = 0 payload = signatures.parse_raw_signature_from_tx(tables) if payload is not None: if ( payload.view != transaction.gcm_header.view or payload.seqno != transaction.gcm_header.seqno ): raise ValueError( f"Signature payload position " f"{payload.view}.{payload.seqno} does not match " f"transaction header position " f"{transaction.gcm_header.view}.{transaction.gcm_header.seqno}" ) self._verify_signing_node_status(payload.signing_node) cert = self.node_certificates[payload.signing_node] if payload.embedded_cert is not None: assert signatures.spki_from_cert( cert ) == signatures.spki_from_cert( payload.embedded_cert ), f"Mismatch in public key for node {payload.signing_node}" signatures.verify_raw_root_signature( cert, payload.root, payload.signature ) signatures.verify_merkle_root(self.merkle, payload.root) verified_count += 1 cose_sign1 = signatures.parse_cose_signature_from_tx(tables) if cose_sign1 is not None: assert ( self.service_cert is not None ), "Cannot verify COSE root signature without a known service certificate" signatures.verify_cose_root_signature( self.service_cert, self.merkle.get_merkle_root(), cose_sign1, ) verified_count += 1 if verified_count == 0: raise ValueError( f"Signature transaction {transaction.gcm_header.view}." f"{transaction.gcm_header.seqno} contained no verifiable " "signature blob" ) self.last_verified_seqno = transaction.gcm_header.seqno self.last_verified_view = transaction.gcm_header.view else: # MERKLE level: trust the first signature, then verify # subsequent ones against the embedded mini-tree's root. # Uses TREE_TABLE rather than the raw-signature payload # so this also works on COSE-only ledgers. tree_table = tables.get(TREE_TABLE_NAME) if ( tree_table is not None and signatures.WELL_KNOWN_SINGLETON_TABLE_KEY in tree_table ): tree_data = tree_table[signatures.WELL_KNOWN_SINGLETON_TABLE_KEY] embedded_tree = MerkleTree() embedded_tree.deserialise(tree_data) if not self.first_signature_seen: self.merkle = embedded_tree self.first_signature_seen = True else: signatures.verify_merkle_root( self.merkle, embedded_tree.get_merkle_root() ) self.last_verified_seqno = transaction.gcm_header.seqno self.last_verified_view = transaction.gcm_header.view # Checks complete, add this transaction to tree (for MERKLE and above) if self.verification_level >= VerificationLevel.MERKLE: # For MERKLE level on isolated chunks: only add leaves after first signature # For FULL level: always add leaves (we have full context) if self.verification_level == VerificationLevel.MERKLE: if self.first_signature_seen: self.merkle.add_leaf(transaction.get_tx_digest(), False) else: self.merkle.add_leaf(transaction.get_tx_digest(), False) def _verify_signing_node_status(self, signing_node: str) -> None: """Verify that ``signing_node`` is a known, non-pending member of the network. Retired nodes may legitimately still issue signatures to keep a reconfiguring network live until the retirement is committed; only ``Pending`` nodes are rejected here. """ if signing_node not in self.node_activity_status: raise signatures.UntrustedNodeException( f"The signing node {signing_node} is not part of the network" ) node_info = self.node_activity_status[signing_node] node_status = NodeStatus(node_info[0]) if node_status == NodeStatus.PENDING: raise signatures.UntrustedNodeException( f"The signing node {signing_node} has unexpected status {node_status.value}" ) @dataclass class TransactionHeader: VERSION_LENGTH = 1 FLAGS_LENGTH = 1 SIZE_LENGTH = 6 # 1-byte entry version version: int # 1-byte flags flags: int # 6-byte transaction size size: int def __init__(self, buffer): if len(buffer) != TransactionHeader.get_size(): raise ValueError("Incomplete transaction header") self.version = int.from_bytes( buffer[: TransactionHeader.VERSION_LENGTH], byteorder="little" ) end_of_flags = TransactionHeader.VERSION_LENGTH + TransactionHeader.FLAGS_LENGTH self.flags = int.from_bytes( buffer[TransactionHeader.VERSION_LENGTH : end_of_flags], byteorder="little", ) end_of_size = end_of_flags + TransactionHeader.SIZE_LENGTH self.size = int.from_bytes(buffer[end_of_flags:end_of_size], byteorder="little") @staticmethod def get_size(): return ( TransactionHeader.VERSION_LENGTH + TransactionHeader.FLAGS_LENGTH + TransactionHeader.SIZE_LENGTH ) class Entry: _file: SimpleBuffer _header: TransactionHeader _public_domain_size: int = 0 _public_domain: PublicDomain | None = None _file_size: int = 0 gcm_header: GcmHeader | None = None def __init__(self, file: SimpleBuffer): if type(self) is Entry: raise TypeError("Entry is not instantiable") self._file = file def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass def _read_header(self): # read the transaction header buffer = _byte_read_safe(self._file, TransactionHeader.get_size()) self._header = TransactionHeader(buffer) entry_start_pos = self._file.tell() # read the AES GCM header buffer = _byte_read_safe(self._file, GcmHeader.size()) self.gcm_header = GcmHeader(buffer) # read the size of the public domain buffer = _byte_read_safe(self._file, LEDGER_DOMAIN_SIZE) self._public_domain_size = to_uint_64(buffer) return entry_start_pos def get_txid(self) -> str: assert self.gcm_header is not None return f"{self.gcm_header.view}.{self.gcm_header.seqno}" def get_public_domain(self) -> PublicDomain: """ Retrieve the public (i.e. non-encrypted) domain for that entry. Note: Even if the entry is private-only, an empty :py:class:`ccf.ledger.PublicDomain` object is returned. :return: :py:class:`ccf.ledger.PublicDomain` """ if self._public_domain is None: buffer = _byte_read_safe(self._file, self._public_domain_size) self._public_domain = PublicDomain(buffer) return self._public_domain def get_private_domain_size(self) -> int: """ Retrieve the size of the private (i.e. encrypted) domain for that transaction. """ return self._header.size - ( GcmHeader.size() + LEDGER_DOMAIN_SIZE + self._public_domain_size ) def get_transaction_header(self) -> TransactionHeader: return self._header
[docs] class Transaction(Entry): """ A transaction represents one entry in the CCF ledger. """ _tx_offset: int = 0 def __init__(self, file: SimpleBuffer): super().__init__(file) self._tx_offset = self._file.tell() super()._read_header()
[docs] def get_raw_tx(self) -> bytes: """ Return raw transaction bytes. :return: Raw transaction bytes. """ assert self._file is not None return _peek( self._file, TransactionHeader.get_size() + self._header.size, pos=self._tx_offset, )
def get_len(self) -> int: return len(self.get_raw_tx()) def get_offsets(self) -> tuple[int, int]: return (self._tx_offset, TransactionHeader.get_size() + self._header.size) def get_write_set_digest(self) -> bytes: return digest(self.get_raw_tx()) def get_tx_digest(self) -> bytes: claims_digest = self.get_public_domain().get_claims_digest() commit_evidence_digest = self.get_public_domain().get_commit_evidence_digest() write_set_digest = self.get_write_set_digest() if claims_digest is None: if commit_evidence_digest is None: return write_set_digest else: return digest(write_set_digest + commit_evidence_digest) else: assert ( commit_evidence_digest ), "Invalid transaction: commit_evidence_digest not set" return digest(write_set_digest + commit_evidence_digest + claims_digest)
[docs] class Snapshot(Entry): """ Utility used to parse the content of a snapshot file. """ _filename: str def __init__(self, filename: str): super().__init__(SimpleBuffer.from_file(filename)) self._filename = filename self._file_size = os.path.getsize(filename) if self._file_size == 0: raise InvalidSnapshotException(f"{filename} is currently empty") entry_start_pos = super()._read_header() # 1.x snapshots do not include evidence if self.is_committed() and not self.is_snapshot_file_1_x(): receipt_pos = entry_start_pos + self._header.size receipt_bytes = _peek_all(self._file, pos=receipt_pos) snapshot_digest = sha256(_peek(self._file, receipt_pos, pos=0)).digest() self._verify_snapshot_receipt(receipt_bytes, receipt_pos, snapshot_digest) def is_committed(self): return COMMITTED_FILE_SUFFIX in self._filename def is_snapshot_file_1_x(self): # Kept here for compatibility if not self.is_committed(): raise ValueError(f"Snapshot file {self._filename} is not yet committed") return len(self._filename.split(COMMITTED_FILE_SUFFIX)[1]) != 0 def get_len(self) -> int: return self._file_size def _verify_snapshot_receipt( self, receipt_bytes: bytes, receipt_pos: int, snapshot_digest: bytes ): if not receipt_bytes: raise InvalidSnapshotException("Empty snapshot receipt") first_byte = receipt_bytes[0] if first_byte == ENCODED_COSE_SIGN1_TAG: self._verify_cose_snapshot_receipt(receipt_bytes, snapshot_digest) elif first_byte == ord("{"): self._verify_json_snapshot_receipt( receipt_bytes, receipt_pos, snapshot_digest ) else: raise InvalidSnapshotException( f"Invalid snapshot receipt: unrecognised format (first byte: 0x{first_byte:02X})" ) def _service_public_key(self): service_info_table = ( self.get_public_domain().get_tables().get(SERVICE_INFO_TABLE_NAME) ) if service_info_table is None: raise InvalidSnapshotException( "Snapshot is missing service info table for COSE receipt verification" ) service_info = service_info_table.get(signatures.WELL_KNOWN_SINGLETON_TABLE_KEY) if service_info is None: raise InvalidSnapshotException( "Snapshot is missing service info for COSE receipt verification" ) service_info_json = json.loads(service_info) cert = load_pem_x509_certificate( service_info_json["cert"].encode("ascii"), default_backend() ) return cert.public_key() def _verify_cose_snapshot_receipt( self, receipt_bytes: bytes, snapshot_digest: bytes ): ccf.cose.verify_receipt( receipt_bytes, self._service_public_key(), snapshot_digest ) def _verify_json_snapshot_receipt( self, receipt_bytes: bytes, receipt_pos: int, snapshot_digest: bytes ): try: receipt = json.loads(receipt_bytes.decode("utf-8")) except json.decoder.JSONDecodeError as e: raise InvalidSnapshotException( f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes!r}" ) from e # Receipts included in snapshots always contain leaf components, # including a claims digest and commit evidence, from 2.0.0-rc0 onwards. # This verification code deliberately does not support snapshots # produced by 2.0.0-dev* releases. assert "leaf_components" in receipt write_set_digest = bytes.fromhex(receipt["leaf_components"]["write_set_digest"]) claims_digest = bytes.fromhex(receipt["leaf_components"]["claims_digest"]) if snapshot_digest != claims_digest: raise InvalidSnapshotException( f"Snapshot digest ({snapshot_digest.hex()}) does not match receipt claim ({claims_digest.hex()})" ) commit_evidence_digest = sha256( receipt["leaf_components"]["commit_evidence"].encode() ).digest() leaf = ( sha256(write_set_digest + commit_evidence_digest + claims_digest) .digest() .hex() ) root = ccf.receipt.root(leaf, receipt["proof"]) node_cert = load_pem_x509_certificate( receipt["cert"].encode(), default_backend() ) ccf.receipt.verify(root, receipt["signature"], node_cert)
class TransactionIterator: _positions: list[int] _buffer: SimpleBuffer _idx: int = -1 def __init__( self, positions: list[int], buffer: SimpleBuffer, ): self._positions = positions self._buffer = buffer def __next__(self): self._idx += 1 if len(self._positions) > self._idx: pos = self._positions[self._idx] return Transaction(self._buffer.clone(at_loc=pos)) else: raise StopIteration def find_tx_positions(file: SimpleBuffer, file_size: int) -> list[int]: pos = LEDGER_HEADER_SIZE ps = [] while pos < file_size: ps.append(pos) file.seek(pos) buffer = _byte_read_safe(file, TransactionHeader.get_size()) header = TransactionHeader(buffer) pos += header.size + TransactionHeader.get_size() return ps def latest_snapshot(snapshots_dir): best_name, best_seqno = None, None for s in os.listdir(snapshots_dir): with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot: snapshot_seqno = snapshot.get_public_domain().get_seqno() if best_seqno is None or snapshot_seqno > best_seqno: best_name = s best_seqno = snapshot_seqno return best_name
[docs] class LedgerChunk: """ Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk. :param str name: Name for a single ledger chunk. """ _filename: str _file: SimpleBuffer def __init__( self, name: str, verification_level: VerificationLevel = VerificationLevel.NONE ): self._filename = name self._file = SimpleBuffer.from_file(name) self._verification_level = verification_level self._pos_offset = int.from_bytes( _byte_read_safe(self._file, LEDGER_HEADER_SIZE), byteorder="little" ) # If the ledger chunk is not yet committed, the ledger header will be empty. # Default to reading the file size instead. full_file_size = os.path.getsize(name) if self._pos_offset > 0: if self._pos_offset > full_file_size: raise ValueError( f"Invalid ledger chunk {name}: File header claims offset table is at {self._pos_offset}, yet file is only {full_file_size} bytes" ) self._file_size = self._pos_offset positions_buffer = _peek_all(self._file, self._pos_offset) buf_len = len(positions_buffer) assert ( buf_len % 4 == 0 ), f"Expected positions to contain uint32s, but contains {buf_len} bytes" positions_count = buf_len // 4 self._positions = [ int.from_bytes( positions_buffer[i * 4 : (i + 1) * 4], byteorder="little", ) for i in range(positions_count) ] else: self._file_size = os.path.getsize(name) self._positions = find_tx_positions(self._file, self._file_size) # Validate offsets if verification level is OFFSETS or higher if self._verification_level >= VerificationLevel.OFFSETS: LedgerValidator.validate_offsets( self._positions, self._file_size, self._file ) self.start_seqno, self.end_seqno = range_from_filename(name) if self.end_seqno is not None: tx_count_from_filename = self.end_seqno - self.start_seqno + 1 tx_count_from_positions = len(self._positions) if tx_count_from_filename != tx_count_from_positions: raise ValueError( f"Invalid ledger chunk {name}: Expected to contain {tx_count_from_filename} transactions due to filename, but found {tx_count_from_positions} by reading file" ) def __getitem__(self, key): if isinstance(key, int): position = self._positions[key] return Transaction(self._file.clone(at_loc=position)) elif isinstance(key, slice): positions = self._positions[key] transactions = [] for p in positions: transactions.append(Transaction(self._file.clone(at_loc=p))) return transactions else: raise KeyError(f"Unsupported type ({type(key)}) passed to LedgerChunk[]") def __iter__(self): return TransactionIterator( self._positions, self._file, ) def __len__(self): return len(self._positions) def filename(self): return self._filename def is_committed(self): return is_ledger_chunk_committed(self._filename) def is_complete(self): return self._pos_offset > 0 def get_seqnos(self): return self.start_seqno, self.end_seqno
class ChunkIterator: _filenames: list _fileindex: int = -1 _current_chunk: LedgerChunk _verification_level: VerificationLevel def __init__( self, filenames: list, validator: LedgerValidator | None = None, verification_level: VerificationLevel = VerificationLevel.NONE, ): self._filenames = filenames self._verification_level = verification_level def __next__(self) -> LedgerChunk: self._fileindex += 1 if len(self._filenames) > self._fileindex: self._current_chunk = LedgerChunk( self._filenames[self._fileindex], self._verification_level ) return self._current_chunk else: raise StopIteration
[docs] class Ledger: """ Class used to iterate over all :py:class:`ccf.ledger.LedgerChunk` stored in a CCF ledger folder. :param str name: Ledger directory for a single CCF node. """ _filenames: list _verification_level: VerificationLevel def __init__( self, paths: list[str], committed_only: bool = True, read_recovery_files: bool = False, verification_level: VerificationLevel = VerificationLevel.NONE, contiguous_suffix: bool = False, ): self._filenames = [] self._verification_level = verification_level ledger_files: list[str] = [] def try_add_chunk(path): sanitised_path = path if path.endswith(RECOVERY_FILE_SUFFIX): sanitised_path = path[: -len(RECOVERY_FILE_SUFFIX)] if not read_recovery_files: return if path.endswith(IGNORED_FILE_SUFFIX): return if committed_only and not sanitised_path.endswith(COMMITTED_FILE_SUFFIX): return # The same ledger file may appear multiple times in different directories # so ignore duplicates if os.path.isfile(path) and not any( os.path.basename(path) in f for f in ledger_files ): ledger_files.append(path) for p in paths: if os.path.isdir(p): for path in os.listdir(p): chunk = os.path.join(p, path) try_add_chunk(chunk) elif os.path.isfile(p): try_add_chunk(p) else: raise ValueError(f"{p} is not a ledger directory or ledger chunk") # Sorts the list based off the first number after ledger_ so that # the ledger is verified in sequence filenames = sorted( ledger_files, key=lambda x: range_from_filename(x)[0], ) suffix = [] # [(ledger_100_105, None), ..., (None, ledger_0_5)] if len(filenames) > 0: for i in range(len(filenames) + 1): idx_n = len(filenames) - i file_newer = ( filenames[idx_n] if idx_n % len(filenames) == idx_n else None ) idx_o = len(filenames) - i - 1 file_older = ( filenames[idx_o] if idx_o % len(filenames) == idx_o else None ) if file_newer is None: continue suffix.append(file_newer) if file_older is None: continue range_newer = range_from_filename(file_newer) range_older = range_from_filename(file_older) # old_X ~> new_A_B => unknown where old_X ends if range_older[1] is None: if not contiguous_suffix: raise ValueError( f"Ledger cannot parse chunk {file_newer} following uncommitted chunk {file_older}" ) break # old_X_Y ~> new_A but Y != A => noncontiguous if range_older[1] is not None and range_older[1] + 1 != range_newer[0]: if not contiguous_suffix: raise ValueError( f"Ledger cannot parse non-contiguous chunks {file_older} and {file_newer}" ) break self._filenames = list(reversed(suffix)) @property def last_committed_chunk_range(self) -> tuple[int, int | None]: last_chunk_name = self._filenames[-1] return range_from_filename(last_chunk_name) def __len__(self): return len(self._filenames) def __getitem__(self, key): if isinstance(key, int): return LedgerChunk(self._filenames[key], self._verification_level) elif isinstance(key, slice): files = self._filenames[key] return [LedgerChunk(file, self._verification_level) for file in files] else: raise KeyError(f"Unsupported type ({type(key)}) passed to Ledger[]") def __iter__(self): return ChunkIterator( self._filenames, verification_level=self._verification_level ) def transactions(self): for chunk in self: for transaction in chunk: yield transaction
[docs] def get_transaction(self, seqno: int) -> Transaction: """ Return the :py:class:`ccf.ledger.Transaction` recorded in the ledger at the given sequence number. Note that the transaction returned may not yet be verified by a signature transaction nor committed by the service. :param int seqno: Sequence number of the transaction to fetch. :return: :py:class:`ccf.ledger.Transaction` """ if seqno < 1: raise ValueError(f"Ledger first seqno is 1, cannot get {seqno}") for filename in self._filenames: start, end = range_from_filename(filename) if seqno >= start and (end is None or seqno <= end): chunk = LedgerChunk(filename) return chunk[seqno - start] raise UnknownTransaction( f"Transaction at seqno {seqno} does not exist in ledger" )
[docs] def get_latest_public_state(self) -> tuple[dict, int]: """ Return the current public state of the service. Note that the public state returned may not yet be verified by a signature transaction nor committed by the service. :return: tuple[dict, int]: Tuple containing a dictionary of public tables and their values and the seqno of the state read from the ledger. """ public_tables: dict[str, dict] = {} latest_seqno = 0 # If a transaction cannot be read (e.g. because it was only partially written to disk # before a crash), return public state so far. This is consistent with CCF's behaviour # which discards the incomplete transaction on recovery. try: for chunk in self: for tx in chunk: public_domain = tx.get_public_domain() latest_seqno = public_domain.get_seqno() for table_name, records in public_domain.get_tables().items(): if table_name in public_tables: public_tables[table_name].update(records) # Remove deleted keys public_tables[table_name] = { k: v for k, v in public_tables[table_name].items() if v is not None } else: public_tables[table_name] = records except Exception as e: print(f"Error reading ledger entry. Latest read seqno: {latest_seqno}") print(f"Error: {e}") return public_tables, latest_seqno
class CommitIdRangeException(Exception): """Missing ledger chunk in the ledger directory""" class UnknownTransaction(Exception): """The transaction at seqno does not exist in ledger""" class InvalidSnapshotException(Exception): """The given snapshot file is invalid and cannot be parsed"""