# Copyright (c) Microsoft Corporation. All rights reserved.# Licensed under the Apache 2.0 License.importioimportstructimportosfromenumimportEnumfromtypingimportBinaryIO,NamedTuple,Optional,Tuple,Dict,Listimportjsonimportbase64fromdataclassesimportdataclassfromcryptography.x509importload_pem_x509_certificatefromcryptography.hazmat.backendsimportdefault_backendfromcryptography.hazmat.primitivesimporthashesfromcryptography.exceptionsimportInvalidSignaturefromcryptography.hazmat.primitives.asymmetricimportutils,ecfromccf.merkletreeimportMerkleTreefromccf.tx_idimportTxIDfromccf.coseimportvalidate_cose_sign1importccf.receiptfromhashlibimportsha256GCM_SIZE_TAG=16GCM_SIZE_IV=12LEDGER_DOMAIN_SIZE=8LEDGER_HEADER_SIZE=8# Public table names as defined in CCFSIGNATURE_TX_TABLE_NAME="public:ccf.internal.signatures"COSE_SIGNATURE_TX_TABLE_NAME="public:ccf.internal.cose_signatures"NODES_TABLE_NAME="public:ccf.gov.nodes.info"ENDORSED_NODE_CERTIFICATES_TABLE_NAME="public:ccf.gov.nodes.endorsed_certificates"SERVICE_INFO_TABLE_NAME="public:ccf.gov.service.info"COMMITTED_FILE_SUFFIX=".committed"RECOVERY_FILE_SUFFIX=".recovery"IGNORED_FILE_SUFFIX=".ignored"# Key used by CCF to record single-key tablesWELL_KNOWN_SINGLETON_TABLE_KEY=bytes(bytearray(8))SHA256_DIGEST_SIZE=sha256().digest_sizeclassNodeStatus(Enum):PENDING="Pending"TRUSTED="Trusted"RETIRED="Retired"classEntryType(Enum):WRITE_SET=0SNAPSHOT=1WRITE_SET_WITH_CLAIMS=2WRITE_SET_WITH_COMMIT_EVIDENCE=3WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS=4defhas_claims(self):returnselfin(EntryType.WRITE_SET_WITH_CLAIMS,EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS,)defhas_commit_evidence(self):returnselfin(EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE,EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE_AND_CLAIMS,)defis_deprecated(self):returnselfin(EntryType.WRITE_SET,EntryType.WRITE_SET_WITH_CLAIMS,EntryType.WRITE_SET_WITH_COMMIT_EVIDENCE,)defto_uint_64(buffer):returnstruct.unpack("@Q",buffer)[0]defis_ledger_chunk_committed(file_name):returnfile_name.endswith(COMMITTED_FILE_SUFFIX)defdigest(data):returnsha256(data).digest()defunpack(stream,fmt):size=struct.calcsize(fmt)buf=stream.read(size)ifnotbuf:raiseEOFError# Reached end of streamreturnstruct.unpack(fmt,buf)[0]defunpack_array(buf,fmt):unpack_iter=struct.iter_unpack(fmt,buf)ret=[]whileTrue:try:ret.append(next(unpack_iter)[0])exceptStopIteration:breakreturnretdefrange_from_filename(filename:str)->Tuple[int,Optional[int]]:elements=(os.path.basename(filename).replace(COMMITTED_FILE_SUFFIX,"").replace(RECOVERY_FILE_SUFFIX,"").replace("ledger_","").split("-"))iflen(elements)==2:return(int(elements[0]),int(elements[1]))eliflen(elements)==1:return(int(elements[0]),None)else:raiseValueError(f"Could not read seqno range from ledger file {filename}")defsnapshot_index_from_filename(filename:str)->Tuple[int,int]:elements=(os.path.basename(filename).replace(COMMITTED_FILE_SUFFIX,"").replace("snapshot_","").split("_"))iflen(elements)==2:return(int(elements[0]),int(elements[1]))else:raiseValueError(f"Could not read snapshot index from file name {filename}")classGcmHeader:view:intseqno:intdef__init__(self,buffer):iflen(buffer)<GcmHeader.size():raiseValueError("Corrupt GCM header")# _gcm_tag = buffer[:GCM_SIZE_TAG] # Unused_gcm_iv=buffer[GCM_SIZE_TAG:GCM_SIZE_TAG+GCM_SIZE_IV]self.seqno=struct.unpack("@Q",_gcm_iv[:8])[0]self.view=struct.unpack("@I",_gcm_iv[8:])[0]&0x7FFFFFFF@staticmethoddefsize():returnGCM_SIZE_TAG+GCM_SIZE_IV
[docs]classPublicDomain:""" All public tables within a :py:class:`ccf.ledger.Transaction`. """_buffer:bytes_cursor:int_entry_type:EntryType_claims_digest:bytes_version:int_max_conflict_version:int_tables:dictdef__init__(self,buffer:bytes):self._entry_type=EntryType(buffer[0])# Already read a 1-byte entry-type, so start from 1 not 0self._cursor=1self._buffer=bufferself._version=self._read_int64()ifself._entry_type.has_claims():self._claims_digest=self._read_buffer(SHA256_DIGEST_SIZE)ifself._entry_type.has_commit_evidence():self._commit_evidence_digest=self._read_buffer(SHA256_DIGEST_SIZE)self._max_conflict_version=self._read_int64()ifself._entry_type==EntryType.SNAPSHOT:self._read_snapshot_header()self._tables={}self._read()def_read_buffer(self,size):prev_cursor=self._cursorself._cursor+=sizereturnself._buffer[prev_cursor:self._cursor]def_read8(self):returnself._read_buffer(8)def_read_int64(self):returnstruct.unpack("<q",self._read8())[0]def_read_uint64(self):returnstruct.unpack("<Q",self._read8())[0]defis_deprecated(self):returnself._entry_type.is_deprecated()defget_version_size(self):return8def_read_versioned_value(self,size):ifsize<self.get_version_size():raiseValueError(f"Invalid versioned value of size {size}")return(self._read_uint64(),self._read_buffer(size-self.get_version_size()))def_read_next_entry(self):size=self._read_uint64()returnself._read_buffer(size)def_read_string(self):returnself._read_next_entry().decode()def_read_snapshot_header(self):# read hash of entry at snapshothash_size=self._read_uint64()buffer=self._read_buffer(hash_size)self._hash_at_snapshot=buffer.hex()# read view historyview_history_size=self._read_uint64()self._view_history=unpack_array(self._read_buffer(view_history_size),"<Q")def_read_snapshot_entry_padding(self,size):padding=-size%8# Padded to 8 bytesself._cursor+=paddingdef_read_snapshot_key(self):size=self._read_uint64()key=self._read_buffer(size)self._read_snapshot_entry_padding(size)returnkeydef_read_snapshot_versioned_value(self):size=self._read_uint64()ver,value=self._read_versioned_value(size)ifver<0:assert(len(value)==0),f"Expected empty value for tombstone deletion at {ver}"value=Noneself._read_snapshot_entry_padding(size)returnvaluedef_read(self):buffer_size=len(self._buffer)whileself._cursor<buffer_size:map_name=self._read_string()records={}self._tables[map_name]=recordsifself._entry_type==EntryType.SNAPSHOT:# map snapshot versionself._read8()# size of map entrymap_size=self._read_uint64()start_map_pos=self._cursorwhileself._cursor-start_map_pos<map_size:k=self._read_snapshot_key()val=self._read_snapshot_versioned_value()records[k]=valelse:# read_versionself._read8()# read_count# Note: Read keys are not currently included in ledger transactionsread_count=self._read_uint64()assertread_count==0,f"Unexpected read count: {read_count}"write_count=self._read_uint64()ifwrite_count:for_inrange(write_count):k=self._read_next_entry()val=self._read_next_entry()records[k]=valremove_count=self._read_uint64()ifremove_count:for_inrange(remove_count):k=self._read_next_entry()records[k]=None
[docs]defget_tables(self)->dict:""" Return a dictionary of all public tables (with their content) in a :py:class:`ccf.ledger.Transaction`. :return: Dictionary of public tables with their content. """returnself._tables
[docs]defget_seqno(self)->int:""" Return the sequence number at which the transaction was recorded in the ledger. """returnself._version
[docs]defget_claims_digest(self)->Optional[bytes]:""" Return the claims digest when there is one """returnself._claims_digestifself._entry_type.has_claims()elseNone
[docs]defget_commit_evidence_digest(self)->Optional[bytes]:""" Return the commit evidence digest when there is one """return(self._commit_evidence_digestifself._entry_type.has_commit_evidence()elseNone)
def_byte_read_safe(file,num_of_bytes):offset=file.tell()ret=file.read(num_of_bytes)iflen(ret)!=num_of_bytes:raiseValueError(f"Failed to read precise number of bytes in {file.name} at offset {offset}: {len(ret)}/{num_of_bytes}")returnretdef_peek(file,num_bytes,pos=None):save_pos=file.tell()ifposisnotNone:file.seek(pos)buffer=_byte_read_safe(file,num_bytes)file.seek(save_pos)returnbufferdef_peek_all(file,pos=None):save_pos=file.tell()ifposisnotNone:file.seek(pos)buffer=file.read()file.seek(save_pos)returnbufferclassTxBundleInfo(NamedTuple):"""Bundle for transaction information required for validation"""merkle_tree:MerkleTreeexisting_root:bytesnode_cert:bytessignature:bytesnode_activity:dictsigning_node:strclassLedgerValidator:""" Ledger Validator contains the logic to verify that the ledger hasn't been tampered with. It has the ability to take transactions and it maintains a MerkleTree data structure similar to CCF. Ledger is valid and hasn't been tampered with if following conditions are met: 1) The merkle proof is signed by a Trusted node in the given network 2) The merkle root and signature are verified with the node cert 3) The merkle proof is correct for each set of transactions """accept_deprecated_entry_types:bool=Truenode_certificates:Dict[str,str]={}node_activity_status:Dict[str,Tuple[str,int,bool]]={}signature_count:int=0def__init__(self,accept_deprecated_entry_types:bool=True):self.accept_deprecated_entry_types=accept_deprecated_entry_typesself.chosen_hash=ec.ECDSA(utils.Prehashed(hashes.SHA256()))# Start with empty bytes array. CCF MerkleTree uses an empty array as the first leaf of its merkle tree.# Don't hash empty bytes array.self.merkle=MerkleTree()empty_bytes_array=bytearray(SHA256_DIGEST_SIZE)self.merkle.add_leaf(empty_bytes_array,do_hash=False)self.last_verified_seqno=0self.last_verified_view=0self.service_status=Noneself.service_cert=Nonedeflast_verified_txid(self)->TxID:returnTxID(self.last_verified_view,self.last_verified_seqno)defadd_transaction(self,transaction):""" To validate the ledger, ledger transactions need to be added via this method. Depending on the tables that were part of the transaction, it does different things. When transaction contains signature table, it starts the verification process and verifies that the root of merkle tree was signed by a node which was part of the network. It also matches the root of the merkle tree that this class maintains with the one extracted from the ledger. Further, it validates all service status transitions. If any of the above checks fail, this method throws. """transaction_public_domain=transaction.get_public_domain()ifnotself.accept_deprecated_entry_types:assertnottransaction_public_domain.is_deprecated()tables=transaction_public_domain.get_tables()# Add contributing nodes certs and update nodes network trust status for verificationnode_certs={}ifNODES_TABLE_NAMEintables:node_table=tables[NODES_TABLE_NAME]fornode_id,node_infoinnode_table.items():node_id=node_id.decode()ifnode_infoisNone:# Node has been removed from the storeself.node_activity_status.pop(node_id)continuenode_info=json.loads(node_info)# Add the self-signed node certificate (only available in 1.x,# refer to node endorsed certificates table otherwise)if"cert"innode_info:node_certs[node_id]=node_info["cert"].encode()self.node_certificates[node_id]=node_certs[node_id]# Update node trust status# Also record the seqno at which the node status changed to# track when a primary node should stop issuing signaturesself.node_activity_status[node_id]=(node_info["status"],transaction_public_domain.get_seqno(),node_info.get("retired_committed",False),)ifENDORSED_NODE_CERTIFICATES_TABLE_NAMEintables:node_endorsed_certificates_tables=tables[ENDORSED_NODE_CERTIFICATES_TABLE_NAME]for(node_id,endorsed_node_cert,)innode_endorsed_certificates_tables.items():node_id=node_id.decode()assert(node_idnotinnode_certs),f"Only one of node self-signed certificate and endorsed certificate should be recorded for node {node_id}"ifendorsed_node_certisNone:# Node has been removed from the storeself.node_certificates.pop(node_id)else:self.node_certificates[node_id]=endorsed_node_cert# This is a merkle root/signature tx if the table existsifSIGNATURE_TX_TABLE_NAMEintables:self.signature_count+=1signature_table=tables[SIGNATURE_TX_TABLE_NAME]for_,signatureinsignature_table.items():signature=json.loads(signature)current_seqno=signature["seqno"]current_view=signature["view"]signing_node=signature["node"]# Get binary representations for the cert, existing root, and signaturecert=self.node_certificates[signing_node]existing_root=bytes.fromhex(signature["root"])sig=base64.b64decode(signature["sig"])tx_info=TxBundleInfo(self.merkle,existing_root,cert,sig,self.node_activity_status,signing_node,)# validations for 1, 2 and 3# throws if ledger validation failed.self._verify_tx_set(tx_info)self.last_verified_seqno=current_seqnoself.last_verified_view=current_view# Check service status transitionsifSERVICE_INFO_TABLE_NAMEintables:service_table=tables[SERVICE_INFO_TABLE_NAME]updated_service=service_table.get(WELL_KNOWN_SINGLETON_TABLE_KEY)updated_service_json=json.loads(updated_service)updated_status=updated_service_json["status"]ifupdated_status=="Opening":# DR can happen at any point, so a transition to "Opening" is always validpasselifself.service_status==updated_status:passelifself.service_status=="Opening":assertupdated_statusin["Open","WaitingForRecoveryShares",],updated_statuselifself.service_status=="Recovering":assertupdated_statusin["WaitingForRecoveryShares"],updated_statuselifself.service_status=="WaitingForRecoveryShares":assertupdated_statusin["Open"],updated_statuselifself.service_status=="Open":assertupdated_statusin["Recovering"],updated_statuselse:assertself.service_statusisNone,self.service_statusself.service_status=updated_statusself.service_cert=updated_service_json["cert"]ifCOSE_SIGNATURE_TX_TABLE_NAMEintables:cose_signature_table=tables[COSE_SIGNATURE_TX_TABLE_NAME]cose_signature=cose_signature_table.get(WELL_KNOWN_SINGLETON_TABLE_KEY)signature=json.loads(cose_signature)cose_sign1=base64.b64decode(signature)self._verify_root_cose_signature(self.merkle.get_merkle_root(),cose_sign1)# Checks complete, add this transaction to treeself.merkle.add_leaf(transaction.get_tx_digest(),False)def_verify_tx_set(self,tx_info:TxBundleInfo):""" Verify items 1, 2, and 3 for all the transactions up until a signature. """# 1) The merkle root is signed by a Trusted node in the given network, else throwsself._verify_node_status(tx_info)# 2) The merkle root and signature are verified with the node cert, else throwsself._verify_root_signature(tx_info)# 3) The merkle root is correct for the set of transactions and matches with the one extracted from the ledger, else throwsself._verify_merkle_root(tx_info.merkle_tree,tx_info.existing_root)@staticmethoddef_verify_node_status(tx_info:TxBundleInfo):"""Verify item 1, The merkle root is signed by a valid node in the given network"""iftx_info.signing_nodenotintx_info.node_activity:raiseUntrustedNodeException(f"The signing node {tx_info.signing_node} is not part of the network")node_info=tx_info.node_activity[tx_info.signing_node]node_status=NodeStatus(node_info[0])# Note: Even nodes that are Retired, and for which retired_committed is True# may be issuing signatures, to ensure the liveness of a reconfiguring# network. They will stop doing so once the transaction that sets retired_committed is itself committed,# but that is unfortunately not observable from the ledger alone.ifnode_status==NodeStatus.PENDING:raiseUntrustedNodeException(f"The signing node {tx_info.signing_node} has unexpected status {node_status.value}")def_verify_root_signature(self,tx_info:TxBundleInfo):"""Verify item 2, that the Merkle root signature validates against the node certificate"""try:cert=load_pem_x509_certificate(tx_info.node_cert,default_backend())pub_key=cert.public_key()assertisinstance(pub_key,ec.EllipticCurvePublicKey)pub_key.verify(tx_info.signature,tx_info.existing_root,self.chosen_hash)# type: ignore[override]# This exception is thrown from x509, catch for logging and raise our ownexceptInvalidSignature:raiseInvalidRootSignatureException("Signature verification failed:"+f"\nCertificate: {tx_info.node_cert.decode()}"+f"\nSignature: {base64.b64encode(tx_info.signature).decode()}"+f"\nRoot: {tx_info.existing_root.hex()}")fromInvalidSignaturedef_verify_root_cose_signature(self,root,cose_sign1):try:cert=load_pem_x509_certificate(self.service_cert.encode("ascii"),default_backend())validate_cose_sign1(cose_sign1=cose_sign1,pubkey=cert.public_key(),payload=root)exceptExceptionasexc:raiseInvalidRootCoseSignatureException("Signature verification failed:"+f"\nCertificate: {self.service_cert}"+f"\nRoot: {root}")fromexcdef_verify_merkle_root(self,merkletree:MerkleTree,existing_root:bytes):"""Verify item 3, by comparing the roots from the merkle tree that's maintained by this class and from the one extracted from the ledger"""root=merkletree.get_merkle_root()ifroot!=existing_root:raiseInvalidRootException(f"\nComputed root: {root.hex()}\nExisting root from ledger: {existing_root.hex()}")@dataclassclassTransactionHeader:VERSION_LENGTH=1FLAGS_LENGTH=1SIZE_LENGTH=6# 1-byte entry versionversion:int# 1-byte flagsflags:int# 6-byte transaction sizesize:intdef__init__(self,buffer):iflen(buffer)!=TransactionHeader.get_size():raiseValueError("Incomplete transaction header")self.version=int.from_bytes(buffer[:TransactionHeader.VERSION_LENGTH],byteorder="little")self.flags=int.from_bytes(buffer[TransactionHeader.VERSION_LENGTH:TransactionHeader.VERSION_LENGTH+TransactionHeader.FLAGS_LENGTH],byteorder="little",)self.size=int.from_bytes(buffer[-TransactionHeader.SIZE_LENGTH:],byteorder="little")@staticmethoddefget_size():return(TransactionHeader.VERSION_LENGTH+TransactionHeader.FLAGS_LENGTH+TransactionHeader.SIZE_LENGTH)classEntry:_file:BinaryIO_header:TransactionHeader_public_domain_size:int=0_public_domain:Optional[PublicDomain]=None_file_size:int=0gcm_header:Optional[GcmHeader]=Nonedef__init__(self,filename:str):iftype(self)isEntry:raiseTypeError("Entry is not instantiable")withopen(filename,mode="rb")asf:self._buffer=f.read()self._file=io.BytesIO(self._buffer)def__enter__(self):returnselfdef__exit__(self,exc_type,exc_value,traceback):self.close()defclose(self):self._file.close()def_read_header(self):# read the transaction headerbuffer=_byte_read_safe(self._file,TransactionHeader.get_size())self._header=TransactionHeader(buffer)entry_start_pos=self._file.tell()# read the AES GCM headerbuffer=_byte_read_safe(self._file,GcmHeader.size())self.gcm_header=GcmHeader(buffer)# read the size of the public domainbuffer=_byte_read_safe(self._file,LEDGER_DOMAIN_SIZE)self._public_domain_size=to_uint_64(buffer)returnentry_start_posdefget_txid(self)->str:assertself.gcm_headerisnotNonereturnf"{self.gcm_header.view}.{self.gcm_header.seqno}"defget_public_domain(self)->PublicDomain:""" Retrieve the public (i.e. non-encrypted) domain for that entry. Note: Even if the entry is private-only, an empty :py:class:`ccf.ledger.PublicDomain` object is returned. :return: :py:class:`ccf.ledger.PublicDomain` """ifself._public_domainisNone:current_pos=self._file.tell()buffer=self._buffer[current_pos:current_pos+self._public_domain_size]self._file.seek(self._public_domain_size,1)self._public_domain=PublicDomain(buffer)returnself._public_domaindefget_private_domain_size(self)->int:""" Retrieve the size of the private (i.e. encrypted) domain for that transaction. """returnself._header.size-(GcmHeader.size()+LEDGER_DOMAIN_SIZE+self._public_domain_size)defget_transaction_header(self)->TransactionHeader:returnself._header
[docs]classTransaction(Entry):""" A transaction represents one entry in the CCF ledger. """_next_offset:int=LEDGER_HEADER_SIZE_tx_offset:int=0_ledger_validator:Optional[LedgerValidator]=Nonedef__init__(self,filename:str,ledger_validator:Optional[LedgerValidator]=None):super().__init__(filename)self._ledger_validator=ledger_validatorself._pos_offset=int.from_bytes(_byte_read_safe(self._file,LEDGER_HEADER_SIZE),byteorder="little")# If the ledger chunk is not yet committed, the ledger header will be empty.# Default to reading the file size instead.self._file_size=(self._pos_offsetifself._pos_offset>0elseos.path.getsize(filename))def_read_header(self):self._tx_offset=self._file.tell()super()._read_header()self._next_offset+=self._header.sizeself._next_offset+=TransactionHeader.get_size()
[docs]defget_raw_tx(self)->bytes:""" Return raw transaction bytes. :return: Raw transaction bytes. """assertself._fileisnotNonereturn_peek(self._file,TransactionHeader.get_size()+self._header.size,pos=self._tx_offset,)
defget_len(self)->int:returnlen(self.get_raw_tx())defget_offsets(self)->Tuple[int,int]:return(self._tx_offset,self._next_offset)defget_write_set_digest(self)->bytes:returndigest(self.get_raw_tx())defget_tx_digest(self)->bytes:claims_digest=self.get_public_domain().get_claims_digest()commit_evidence_digest=self.get_public_domain().get_commit_evidence_digest()write_set_digest=self.get_write_set_digest()ifclaims_digestisNone:ifcommit_evidence_digestisNone:returnwrite_set_digestelse:returndigest(write_set_digest+commit_evidence_digest)else:assert(commit_evidence_digest),"Invalid transaction: commit_evidence_digest not set"returndigest(write_set_digest+commit_evidence_digest+claims_digest)def_complete_read(self):self._file.seek(self._next_offset,0)self._public_domain=Nonedef__iter__(self):returnselfdef__next__(self):ifself._next_offset==self._file_size:super().close()raiseStopIteration()self._complete_read()self._read_header()# Adds every transaction to the ledger validator# LedgerValidator does verification for every added transaction# and throws when it finds any anomaly.ifself._ledger_validatorisnotNone:self._ledger_validator.add_transaction(self)returnself
[docs]classSnapshot(Entry):""" Utility used to parse the content of a snapshot file. """_filename:strdef__init__(self,filename:str):super().__init__(filename)self._filename=filenameself._file_size=os.path.getsize(filename)entry_start_pos=super()._read_header()# 1.x snapshots do not include evidenceifself.is_committed()andnotself.is_snapshot_file_1_x():receipt_pos=entry_start_pos+self._header.sizereceipt_bytes=_peek_all(self._file,pos=receipt_pos)receipt=json.loads(receipt_bytes.decode("utf-8"))# Receipts included in snapshots always contain leaf components,# including a claims digest and commit evidence, from 2.0.0-rc0 onwards.# This verification code deliberately does not support snapshots# produced by 2.0.0-dev* releases.assert"leaf_components"inreceiptwrite_set_digest=bytes.fromhex(receipt["leaf_components"]["write_set_digest"])claims_digest=bytes.fromhex(receipt["leaf_components"]["claims_digest"])commit_evidence_digest=sha256(receipt["leaf_components"]["commit_evidence"].encode()).digest()leaf=(sha256(write_set_digest+commit_evidence_digest+claims_digest).digest().hex())root=ccf.receipt.root(leaf,receipt["proof"])node_cert=load_pem_x509_certificate(receipt["cert"].encode(),default_backend())ccf.receipt.verify(root,receipt["signature"],node_cert)defis_committed(self):returnCOMMITTED_FILE_SUFFIXinself._filenamedefis_snapshot_file_1_x(self):# Kept here for compatibilityifnotself.is_committed():raiseValueError(f"Snapshot file {self._filename} is not yet committed")returnlen(self._filename.split(COMMITTED_FILE_SUFFIX)[1])!=0defget_len(self)->int:returnself._file_size
[docs]classLedgerChunk:""" Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk. :param str name: Name for a single ledger chunk. :param LedgerValidator ledger_validator: :py:class:`LedgerValidator` instance used to verify ledger integrity. """_current_tx:Transaction_filename:str_ledger_validator:Optional[LedgerValidator]=Nonedef__init__(self,name:str,ledger_validator:Optional[LedgerValidator]=None):self._ledger_validator=ledger_validatorself._current_tx=Transaction(name,ledger_validator)self._pos_offset=self._current_tx._pos_offsetself._filename=nameself.start_seqno,self.end_seqno=range_from_filename(name)def__next__(self)->Transaction:returnnext(self._current_tx)def__iter__(self):returnselfdeffilename(self):returnself._filenamedefis_committed(self):returnis_ledger_chunk_committed(self._filename)defis_complete(self):returnself._pos_offset>0defget_seqnos(self):returnself.start_seqno,self.end_seqno
[docs]classLedger:""" Class used to iterate over all :py:class:`ccf.ledger.LedgerChunk` stored in a CCF ledger folder. :param str name: Ledger directory for a single CCF node. """_filenames:list_validator:Optional[LedgerValidator]def__init__(self,paths:List[str],committed_only:bool=True,read_recovery_files:bool=False,validator:Optional[LedgerValidator]=None,):self._filenames=[]ledger_files:List[str]=[]deftry_add_chunk(path):sanitised_path=pathifpath.endswith(RECOVERY_FILE_SUFFIX):sanitised_path=path[:-len(RECOVERY_FILE_SUFFIX)]ifnotread_recovery_files:returnifpath.endswith(IGNORED_FILE_SUFFIX):returnifcommitted_onlyandnotsanitised_path.endswith(COMMITTED_FILE_SUFFIX):return# The same ledger file may appear multiple times in different directories# so ignore duplicatesifos.path.isfile(path)andnotany(os.path.basename(path)infforfinledger_files):ledger_files.append(path)forpinpaths:ifos.path.isdir(p):forpathinos.listdir(p):chunk=os.path.join(p,path)try_add_chunk(chunk)elifos.path.isfile(p):try_add_chunk(p)else:raiseValueError(f"{p} is not a ledger directory or ledger chunk")# Sorts the list based off the first number after ledger_ so that# the ledger is verified in sequenceself._filenames=sorted(ledger_files,key=lambdax:range_from_filename(x)[0],)# If we do not have a single contiguous range, report an errorforfile_a,file_binzip(self._filenames[:-1],self._filenames[1:]):range_a=range_from_filename(file_a)range_b=range_from_filename(file_b)ifrange_a[1]isNoneandrange_b[1]isnotNone:raiseValueError(f"Ledger cannot parse committed chunk {file_b} following uncommitted chunk {file_a}")ifvalidatorandrange_a[1]isnotNoneandrange_a[1]+1!=range_b[0]:raiseValueError(f"Ledger cannot parse non-contiguous chunks {file_a} and {file_b}")self._validator=validator@propertydeflast_committed_chunk_range(self)->Tuple[int,Optional[int]]:last_chunk_name=self._filenames[-1]returnrange_from_filename(last_chunk_name)def__len__(self):returnlen(self._filenames)def__iter__(self):returnLedgerIterator(self._filenames,self._validator)deftransactions(self):forchunkinself:fortransactioninchunk:yieldtransaction
[docs]defget_transaction(self,seqno:int)->Transaction:""" Return the :py:class:`ccf.ledger.Transaction` recorded in the ledger at the given sequence number. Note that the transaction returned may not yet be verified by a signature transaction nor committed by the service. :param int seqno: Sequence number of the transaction to fetch. :return: :py:class:`ccf.ledger.Transaction` """ifseqno<1:raiseValueError("Ledger first seqno is 1")transaction=Noneforchunkinself:_,chunk_end=chunk.get_seqnos()fortxinchunk:ifchunk_endandchunk_end<seqno:continuepublic_transaction=tx.get_public_domain()ifpublic_transaction.get_seqno()==seqno:returntxiftransactionisNone:raiseUnknownTransaction(f"Transaction at seqno {seqno} does not exist in ledger")returntransaction
[docs]defget_latest_public_state(self)->Tuple[dict,int]:""" Return the current public state of the service. Note that the public state returned may not yet be verified by a signature transaction nor committed by the service. :return: Tuple[Dict, int]: Tuple containing a dictionary of public tables and their values and the seqno of the state read from the ledger. """public_tables:Dict[str,Dict]={}latest_seqno=0# If a transaction cannot be read (e.g. because it was only partially written to disk# before a crash), return public state so far. This is consistent with CCF's behaviour# which discards the incomplete transaction on recovery.try:forchunkinself:fortxinchunk:public_domain=tx.get_public_domain()latest_seqno=public_domain.get_seqno()fortable_name,recordsinpublic_domain.get_tables().items():iftable_nameinpublic_tables:public_tables[table_name].update(records)# Remove deleted keyspublic_tables[table_name]={k:vfork,vinpublic_tables[table_name].items()ifvisnotNone}else:public_tables[table_name]=recordsexceptException:print(f"Error reading ledger entry. Latest read seqno: {latest_seqno}")returnpublic_tables,latest_seqno
defvalidator(self):returnself._validator
classInvalidRootException(Exception):"""MerkleTree root doesn't match with the root reported in the signature's table"""classInvalidRootSignatureException(Exception):"""Signature of the MerkleRoot doesn't match with the signature that's reported in the signature's table"""classInvalidRootCoseSignatureException(Exception):"""COSE signature of the MerkleRoot doesn't pass COSE verification"""classCommitIdRangeException(Exception):"""Missing ledger chunk in the ledger directory"""classUntrustedNodeException(Exception):"""The signing node wasn't part of the network"""classUnknownTransaction(Exception):"""The transaction at seqno does not exist in ledger"""