from binascii import hexlify, unhexlify import copy from io import BytesIO import struct from codecs import encode import time from enum import IntEnum from typing import Optional, Tuple from ..serialize import ( deser_compact_size, deser_uint256, SER_DEFAULT, ser_compact_size, ser_uint256, uint256_from_bigendian, uint256_from_str, deser_string, ser_string, deser_vector, ser_vector, uint256_from_compact, ) from ..util import hash256, sha256, uint256_to_rpc_hex from ..constants import ( COIN, BTCBCH_SIGHASH_ALL, BTCBCH_SIGHASH_FORKID, BTCBCH_SIGHASH_SINGLE, BTCBCH_SIGHASH_NONE, BTCBCH_SIGHASH_ANYONECANPAY, ) class COutPoint: def __init__(self, txhash: str | bytes = 0, n=0): if isinstance(txhash, str): txhash = uint256_from_bigendian(hash) if isinstance(txhash, bytes): txhash = uint256_from_str(hash) self.hash = txhash self.n = n def deserialize(self, f): self.hash = deser_uint256(f) self.n = struct.unpack(" str: return ser_uint256(self.id)[::-1].hex() @id_hex.setter def id_hex(self, hex: str): b = bytes.fromhex(hex) assert len(b) == 32 self.id = uint256_from_str(b[::-1]) def deserialize(self, f): self.id = deser_uint256(f) self.bitfield = struct.unpack(" bytes: r = bytearray() r += ser_uint256(self.id) r += struct.pack("B", self.bitfield) if self.has_commitment_length(): r += ser_string(self.commitment) if self.has_amount(): r += ser_compact_size(self.amount) return bytes(r) def get_capability(self) -> int: return self.bitfield & 0x0F def has_commitment_length(self) -> bool: return bool(self.bitfield & TokenStructure.HAS_COMMITMENT_LENGTH) def has_amount(self) -> bool: return bool(self.bitfield & TokenStructure.HAS_AMOUNT) def has_nft(self) -> bool: return bool(self.bitfield & TokenStructure.HAS_NFT) def is_minting_nft(self) -> bool: return self.has_nft() and self.get_capability() == TokenCapability.MINTING def is_mutable_nft(self) -> bool: return self.has_nft() and self.get_capability() == TokenCapability.MUTABLE def is_immutable_nft(self) -> bool: return self.has_nft() and self.get_capability() == TokenCapability.NO_CAPABILITY def is_valid_bitfield(self) -> bool: s = self.bitfield & 0xF0 if s >= 0x80 or s == 0x00: return False if self.bitfield & 0x0F > 2: return False if not self.has_nft() and not self.has_amount(): return False if not self.has_nft() and (self.bitfield & 0x0F) != 0: return False if not self.has_nft() and self.has_commitment_length(): return False return True def __repr__(self) -> str: return ( f"TokenOutputData(id={self.id_hex} bitfield={self.bitfield:02x} amount={self.amount} " f"commitment={self.commitment[:40].hex()})" ) class Token: """Emulate the C++ 'token' namespace""" PREFIX_BYTE = bytes([0xEF]) Structure = TokenStructure Capability = TokenCapability OutputData = TokenOutputData @classmethod def wrap_spk( cls, token_data: Optional[TokenOutputData], script_pub_key: bytes ) -> bytes: if not token_data: return script_pub_key buf = bytearray() buf += cls.PREFIX_BYTE buf += token_data.serialize() buf += script_pub_key return bytes(buf) @classmethod def unwrap_spk(cls, wrapped_spk: bytes) -> Tuple[Optional[TokenOutputData], bytes]: if not wrapped_spk or wrapped_spk[0] != cls.PREFIX_BYTE[0]: return None, wrapped_spk token_data = TokenOutputData() f = BytesIO(wrapped_spk) pfx = f.read(1) # consume prefix byte assert pfx == cls.PREFIX_BYTE token_data.deserialize( f ) # unserialize token_data from buffer after prefix_byte if ( not token_data.is_valid_bitfield() or (token_data.has_amount() and not token_data.amount) or (token_data.has_commitment_length() and not token_data.commitment) ): # Bad bitfield or 0 serialized amount or empty serialized commitment is a deserialization error, # just return the entire buffer as spk return None, wrapped_spk spk = wrapped_spk[f.tell() :] # leftover bytes go to real spk return token_data, spk # Parsed ok class CTxOut: __slots__ = ("n_value", "script_pubkey", "token_data") def __init__(self, n_value=0, script_pubkey=b"", token_data=None): self.n_value = n_value self.script_pubkey = script_pubkey self.token_data = token_data def deserialize(self, f): self.n_value = struct.unpack(" 21000000 * COIN: return False return True def get_rpc_hex_id(self): """Returns the Id in the same format it would be returned via a RPC call""" return uint256_to_rpc_hex(self.get_hash()) def __repr__(self): return f"CTransaction(n_version={int(self.n_version)} vin={repr(self.vin)} vout={repr(self.vout)} nLockTime={int(self.n_lock_time)})" # pylint: disable=too-many-arguments def signature_hash( self, in_number, script_code, n_value, hashcode=BTCBCH_SIGHASH_ALL | BTCBCH_SIGHASH_FORKID, single=False, debug=False, ): """Calculate hash digest for given input, using SIGHASH_FORKID (Bitcoin Cash signing). Returns it in binary, little-endian. txin is the corresponding input CTransaction. Supplying it is necessary to include the scriptPubKey in the hashed output. If single is True, just a single invocation of SHA256 is done, instead of the usual, expected double hashing. This is to aid applications such as CHECKDATASIG(VERIFY). """ hashdata = struct.pack("= len(script_code) > 0 hashdata += struct.pack(" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i + 1, len(hashes) - 1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes if hashes: return uint256_from_str(hashes[0]) return 0 def is_valid(self): self.calc_sha256() target = uint256_from_compact(self.n_bits) if self.sha256 > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hash_merkle_root: return False return True def solve(self): self.rehash() target = uint256_from_compact(self.n_bits) while self.sha256 > target: self.n_nonce += 1 self.rehash() def __str__(self): return f"CBlock(n_version={int(self.n_version)} hash_prev_block={self.hash_prev_block:064x} hash_merkle_root={self.hash_merkle_root:064x} n_time={time.ctime(self.n_time)} n_bits={self.n_bits:08x} n_nonce={self.n_nonce:08x} vtx_len={len(self.vtx)})" def __repr__(self): return f"CBlock(n_version={int(self.n_version)} hash_prev_block={self.hash_prev_block:064x} hash_merkle_root={self.hash_merkle_root:064x} n_time={time.ctime(self.n_time)} n_bits={self.n_bits:08x} n_nonce={self.n_nonce:08x} vtx={repr(self.vtx)})" class MsgTx: command = b"tx" def __init__(self, tx=CTransaction()): self.tx = tx def deserialize(self, f): self.tx.deserialize(f) def serialize(self): return self.tx.serialize() def __repr__(self): return f"msg_tx(tx={repr(self.tx)})" class MsgBlock: command = b"block" def __init__(self, block=None): if block is None: self.block = CBlock() else: self.block = block def deserialize(self, f): self.block.deserialize(f) def serialize(self): return self.block.serialize() def __str__(self): return f"msg_block(block={str(self.block)})" def __repr__(self): return f"msg_block(block={repr(self.block)})" class MsgHeaders: """ headers message has """ command = b"headers" def __init__(self, headers=None): self.headers = [] if headers is None else headers def deserialize(self, f): # comment in bitcoind indicates these should be deserialized as blocks blocks = deser_vector(f, CBlock) for x in blocks: self.headers.append(CBlockHeader(x)) def serialize(self, stype=SER_DEFAULT): blocks = [CBlock(x) for x in self.headers] return ser_vector(blocks, stype) def __repr__(self): return f"msg_headers(headers={repr(self.headers)})"