import socket import struct import random import decimal from binascii import hexlify, unhexlify import time from codecs import encode from io import BytesIO import copy from test_framework.schnorr import sign from test_framework import util from test_framework.script import find_and_delete, CScript, OP_CODESEPARATOR from test_framework.constants import ( SIGHASH_ALL_IN, SIGHASH_FIRSTN_IN, SIGHASH_THIS_IN, BTCBCH_SIGHASH_ANYONECANPAY, BTCBCH_SIGHASH_SINGLE, BTCBCH_SIGHASH_NONE, ) from ..base58 import decode_base58, encode_base58 from ..constants import COIN from ..serialize import ( ser_string, deser_uint256, ser_uint256, uint256_from_bigendian, uint256_from_str, deser_string, deser_vector, ser_vector, deser_uint256_vector, ser_uint256_vector, deser_varint, ser_varint, uint256_from_compact, from_hex, ) from ..constants import TEST_PEER_SUBVERSION, TEST_PEER_VERSION from ..util import sha256, hash256 # Serialization Types SER_DEFAULT = 0 SER_ID = 0 SER_IDEM = 1 INVALID_OPCODE = b"\xff" # Helper function def bitcoin_address2bin(btc_address): """convert a bitcoin address to binary data capable of being put in a CScript""" # chop the version and checksum out of the bytes of the address return decode_base58(btc_address)[1:-4] def encode_bitcoin_address(prefix, data): data2 = prefix + data cksm = hash256(data2)[:4] data3 = data2 + cksm b58 = encode_base58(data3) return b58 # Objects that map to bitcoind objects, which can be serialized/deserialized # because the nVersion field has not been passed before the VERSION message the protocol uses an old format for the CAddress (missing nTime) # This class handles that old format class CAddressInVersion: def __init__(self, ip="0.0.0.0", port=0): self.n_services = 1 self.pch_reserved = ( b"\x00" * 10 + b"\xff" * 2 ) # ip is 16 bytes on wire to handle v6 self.ip = ip self.port = port def deserialize(self, f): self.n_services = struct.unpack("H", f.read(2))[0] def serialize(self, _stype=SER_DEFAULT): r = b"" r += struct.pack("H", self.port) return r def __repr__(self): return f"CAddressInVersion(nServices={int(self.n_services)} ip={self.ip} port={int(self.port)})" # Handle new-style CAddress objects (with nTime) class CAddress: def __init__(self, ip="0.0.0.0", port=0): self.n_services = 1 self.n_time = int(time.time()) self.pch_reserved = ( b"\x00" * 10 + b"\xff" * 2 ) # ip is 16 bytes on wire to handle v6 self.ip = ip self.port = port def deserialize(self, f): self.n_time = struct.unpack("H", f.read(2))[0] def serialize(self, _stype=SER_DEFAULT): r = b"" r += struct.pack("H", self.port) return r def __repr__(self): return f"CAddress(nServices={int(self.n_services)} ip={self.ip} port={int(self.port)} time={int(self.n_time)})" class CInv: MSG_TX = 1 MSG_BLOCK = 2 MSG_FILTERED_BLOCK = 3 MSG_CMPCT_BLOCK = 4 MSG_XTHINBLOCK = 5 MSG_THINBLOCK = MSG_CMPCT_BLOCK typemap = { 0: "Error", 1: "TX", 2: "Block", 3: "FilteredBlock", 4: "CompactBlock", 5: "XThinBlock", } def __init__(self, t=0, h=0): assert isinstance(t, int) if isinstance(h, bytes): h = deser_uint256(h) assert isinstance(h, int) self.type = t self.hash = h def deserialize(self, f): self.type = struct.unpack("= len(self.vin): return (hash_one, f"in_idx {int(in_idx)} out of range ({len(self.vin)})") # create copy as it is going to be modified with find_and_delete(..) txtmp = CTransaction(self) for txin in txtmp.vin: txin.script_sig = b"" txtmp.vin[in_idx].script_sig = find_and_delete( script, CScript([OP_CODESEPARATOR]) ) if (hashtype & 0x1F) == BTCBCH_SIGHASH_NONE: txtmp.vout = [] for i, txin in enumerate(txtmp.vin): if i != in_idx: txin.n_sequence = 0 elif (hashtype & 0x1F) == BTCBCH_SIGHASH_SINGLE: out_idx = in_idx if out_idx >= len(txtmp.vout): return ( hash_one, f"out_idx {int(out_idx)} out of range ({len(txtmp.vout)})", ) tmp = txtmp.vout[out_idx] txtmp.vout = [] for i in range(out_idx): txtmp.vout.append(CTxOut()) txtmp.vout.append(tmp) for i, txin in enumerate(txtmp.vin): if i != in_idx: txin.n_sequence = 0 if hashtype & BTCBCH_SIGHASH_ANYONECANPAY: tmp = txtmp.vin[in_idx] txtmp.vin = [] txtmp.vin.append(tmp) s = txtmp.serialize() s += struct.pack(b" 1: newhashes = [] for i in range(0, len(hashes), 2): i2 = min(i + 1, len(hashes) - 1) newhashes.append(hash256(hashes[i] + hashes[i2])) hashes = newhashes if hashes: return uint256_from_str(hashes[0]) return 0 def calc_size(self): self.size = len(self.serialize()) - (len(self.nonce) + 1) def calc_mining_hash(self): raise NotImplementedError() def is_valid(self): mining_hash = self.calc_mining_hash() target = uint256_from_compact(self.n_bits) if mining_hash > target: return False for tx in self.vtx: if not tx.is_valid(): return False if self.calc_merkle_root() != self.hash_merkle_root: return False return True def solve(self): assert self.tx_count == len(self.vtx) assert self.size == len(self.serialize()) - (len(self.nonce) + 1) target = uint256_from_compact(self.n_bits) mining_commitment = uint256_from_str(self.calc_mining_commitment()) while True: r = b"" r += ser_uint256(mining_commitment) r += ser_string(self.nonce) mining_hash = hash256(r) sha256of_mh = sha256(mining_hash) # create a private key from the blockhash private_key = mining_hash # create a schorr sig by signing with the sha256(blockhash) and, # private key from above sig = sign(private_key, sha256of_mh) # get the sha256 of the schnorr sig schnorr_sha256 = uint256_from_str(sha256(sig)) if schnorr_sha256 < target: break # Roll 3 bytes TODO: verify that nonce is big enough if self.nonce[0] == 255: if len(self.nonce) == 1: self.nonce.append(0) if self.nonce[1] == 255: if len(self.nonce) == 2: self.nonce.append(0) self.nonce[2] += 1 self.nonce[1] += 1 self.nonce[0] += 1 self.hash_num = None # force recalculation of hash since block changed self.hash = None def __str__(self): return f"CBlock(hashPrevBlock={self.hash_prev_block:064x} hashMerkleRoot={self.hash_merkle_root:064x} nTime={time.ctime(self.n_time)} nBits={self.n_bits:08x} nonce={self.nonce.hex()} vtx_len={len(self.vtx)})" def __repr__(self): return f"CBlock(hashPrevBlock={self.hash_prev_block:064x} hashMerkleRoot={self.hash_merkle_root:064x} nTime={time.ctime(self.n_time)} nBits={self.n_bits:08x} nonce={self.nonce.hex()} vtx={repr(self.vtx)})" # Objects that correspond to messages on the wire # pylint: disable=too-many-instance-attributes class MsgVersion: command = b"version" def __init__(self): self.n_version = TEST_PEER_VERSION self.n_services = 1 self.n_time = int(time.time()) self.addr_to = CAddressInVersion() self.addr_from = CAddressInVersion() self.n_nonce = random.getrandbits(64) self.str_sub_ver = TEST_PEER_SUBVERSION self.n_starting_height = -1 def deserialize(self, f): self.n_version = struct.unpack("= 106: self.addr_from = CAddressInVersion() self.addr_from.deserialize(f) self.n_nonce = struct.unpack("= 209: self.n_starting_height = struct.unpack(" """ command = b"headers" def __init__(self, headers=None): self.headers = [] if headers is None else headers def deserialize(self, f): # In BCH/BTC these are serialized/deserialized as blocks with 0 transactions (regardless of the actual # tx) # In Nexa these are serialized/deserialized as block headers self.headers = deser_vector(f, CBlockHeader) def serialize(self, stype=SER_DEFAULT): for x in self.headers: assert isinstance(x, CBlockHeader) return ser_vector(self.headers, stype) def __repr__(self): return f"MsgHeaders(headers={repr(self.headers)})" class MsgReject: command = b"reject" REJECT_MALFORMED = 1 def __init__(self): self.message = b"" self.code = 0 self.reason = b"" self.data = 0 def deserialize(self, f): self.message = deser_string(f) self.code = struct.unpack("