#!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Copyright (c) 2017-2022 The Zcash developers # Distributed under the MIT software license, see the accompanying # file COPYING or https://www.opensource.org/licenses/mit-license.php . # # script.py # # This file is modified from python-bitcoinlib. # """Scripts Functionality to build scripts, as well as SignatureHash(). """ import sys bchr = chr bord = ord if sys.version > '3': long = int bchr = lambda x: bytes([x]) bord = lambda x: x from hashlib import blake2b from binascii import hexlify import struct from test_framework.bignum import bn2vch from test_framework.mininode import (CTransaction, CTxOut, hash256, ser_string, ser_uint256) MAX_SCRIPT_SIZE = 10000 MAX_SCRIPT_ELEMENT_SIZE = 520 MAX_SCRIPT_OPCODES = 201 OPCODE_NAMES = {} _opcode_instances = [] class CScriptOp(int): """A single script opcode""" __slots__ = [] @staticmethod def encode_op_pushdata(d): """Encode a PUSHDATA op, returning bytes""" if len(d) < 0x4c: return b'' + struct.pack('B', len(d)) + d # OP_PUSHDATA elif len(d) <= 0xff: return b'\x4c' + struct.pack('B', len(d)) + d # OP_PUSHDATA1 elif len(d) <= 0xffff: return b'\x4d' + struct.pack(b'>= 8 if r[-1] & 0x80: r.append(0x80 if neg else 0) elif neg: r[-1] |= 0x80 return struct.pack("B", len(r)) + r class CScript(bytes): """Serialized script A bytes subclass, so you can use this directly whenever bytes are accepted. Note that this means that indexing does *not* work - you'll get an index by byte rather than opcode. This format was chosen for efficiency so that the general case would not require creating a lot of little CScriptOP objects. iter(script) however does iterate by opcode. """ @classmethod def __coerce_instance(cls, other): # Coerce other into bytes if isinstance(other, CScriptOp): other = bytes([other]) elif isinstance(other, CScriptNum): if (other.value == 0): other = bytes([CScriptOp(OP_0)]) else: other = CScriptNum.encode(other) elif isinstance(other, int): if 0 <= other <= 16: other = bytes([CScriptOp.encode_op_n(other)]) elif other == -1: other = bytes([OP_1NEGATE]) else: other = CScriptOp.encode_op_pushdata(bn2vch(other)) elif isinstance(other, (bytes, bytearray)): other = bytes(CScriptOp.encode_op_pushdata(other)) return other def __add__(self, other): # Do the coercion outside of the try block so that errors in it are # noticed. other = self.__coerce_instance(other) try: # bytes.__add__ always returns bytes instances unfortunately return CScript(super(CScript, self).__add__(other)) except TypeError: raise TypeError('Can not add a %r instance to a CScript' % other.__class__) def join(self, iterable): # join makes no sense for a CScript() raise NotImplementedError def __new__(cls, value=b''): if isinstance(value, bytes) or isinstance(value, bytearray): return super(CScript, cls).__new__(cls, value) else: def coerce_iterable(iterable): for instance in iterable: yield cls.__coerce_instance(instance) # Annoyingly on both python2 and python3 bytes.join() always # returns a bytes instance even when subclassed. return super(CScript, cls).__new__(cls, b''.join(coerce_iterable(value))) def raw_iter(self): """Raw iteration Yields tuples of (opcode, data, sop_idx) so that the different possible PUSHDATA encodings can be accurately distinguished, as well as determining the exact opcode byte indexes. (sop_idx) """ i = 0 while i < len(self): sop_idx = i opcode = bord(self[i]) i += 1 if opcode > OP_PUSHDATA4: yield (opcode, None, sop_idx) else: datasize = None pushdata_type = None if opcode < OP_PUSHDATA1: pushdata_type = 'PUSHDATA(%d)' % opcode datasize = opcode elif opcode == OP_PUSHDATA1: pushdata_type = 'PUSHDATA1' if i >= len(self): raise CScriptInvalidError('PUSHDATA1: missing data length') datasize = bord(self[i]) i += 1 elif opcode == OP_PUSHDATA2: pushdata_type = 'PUSHDATA2' if i + 1 >= len(self): raise CScriptInvalidError('PUSHDATA2: missing data length') datasize = bord(self[i]) + (bord(self[i+1]) << 8) i += 2 elif opcode == OP_PUSHDATA4: pushdata_type = 'PUSHDATA4' if i + 3 >= len(self): raise CScriptInvalidError('PUSHDATA4: missing data length') datasize = bord(self[i]) + (bord(self[i+1]) << 8) + (bord(self[i+2]) << 16) + (bord(self[i+3]) << 24) i += 4 else: assert False # shouldn't happen data = bytes(self[i:i+datasize]) # Check for truncation if len(data) < datasize: raise CScriptTruncatedPushDataError('%s: truncated data' % pushdata_type, data) i += datasize yield (opcode, data, sop_idx) def __iter__(self): """'Cooked' iteration Returns either a CScriptOP instance, an integer, or bytes, as appropriate. See raw_iter() if you need to distinguish the different possible PUSHDATA encodings. """ for (opcode, data, sop_idx) in self.raw_iter(): if data is not None: yield data else: opcode = CScriptOp(opcode) if opcode.is_small_int(): yield opcode.decode_op_n() else: yield CScriptOp(opcode) def __repr__(self): # For Python3 compatibility add b before strings so testcases don't # need to change def _repr(o): if isinstance(o, bytes): return b"x('%s')" % hexlify(o).decode('ascii') else: return repr(o) ops = [] i = iter(self) while True: op = None try: op = _repr(next(i)) except CScriptTruncatedPushDataError as err: op = '%s...' % (_repr(err.data), err) break except CScriptInvalidError as err: op = '' % err break except StopIteration: break finally: if op is not None: ops.append(op) return "CScript([%s])" % ', '.join(ops) def GetSigOpCount(self, fAccurate): """Get the SigOp count. fAccurate - Accurately count CHECKMULTISIG, see BIP16 for details. Note that this is consensus-critical. """ n = 0 lastOpcode = OP_INVALIDOPCODE for (opcode, data, sop_idx) in self.raw_iter(): if opcode in (OP_CHECKSIG, OP_CHECKSIGVERIFY): n += 1 elif opcode in (OP_CHECKMULTISIG, OP_CHECKMULTISIGVERIFY): if fAccurate and (OP_1 <= lastOpcode <= OP_16): n += opcode.decode_op_n() else: n += 20 lastOpcode = opcode return n SIGHASH_ALL = 1 SIGHASH_NONE = 2 SIGHASH_SINGLE = 3 SIGHASH_ANYONECANPAY = 0x80 def getHashPrevouts(tx, person=b'ZcashPrevoutHash'): digest = blake2b(digest_size=32, person=person) for x in tx.vin: digest.update(x.prevout.serialize()) return digest.digest() def getHashSequence(tx, person=b'ZcashSequencHash'): digest = blake2b(digest_size=32, person=person) for x in tx.vin: digest.update(struct.pack('= len(txTo.vin): raise ValueError("inIdx %d out of range (%d)" % (inIdx, len(txTo.vin))) if consensusBranchId != 0: # ZIP 243 hashPrevouts = b'\x00'*32 hashSequence = b'\x00'*32 hashOutputs = b'\x00'*32 hashJoinSplits = b'\x00'*32 hashShieldedSpends = b'\x00'*32 hashShieldedOutputs = b'\x00'*32 if not (hashtype & SIGHASH_ANYONECANPAY): hashPrevouts = getHashPrevouts(txTo) if (not (hashtype & SIGHASH_ANYONECANPAY)) and \ (hashtype & 0x1f) != SIGHASH_SINGLE and \ (hashtype & 0x1f) != SIGHASH_NONE: hashSequence = getHashSequence(txTo) if (hashtype & 0x1f) != SIGHASH_SINGLE and \ (hashtype & 0x1f) != SIGHASH_NONE: hashOutputs = getHashOutputs(txTo) elif (hashtype & 0x1f) == SIGHASH_SINGLE and \ 0 <= inIdx and inIdx < len(txTo.vout): digest = blake2b(digest_size=32, person=b'ZcashOutputsHash') digest.update(txTo.vout[inIdx].serialize()) hashOutputs = digest.digest() if len(txTo.vJoinSplit) > 0: hashJoinSplits = getHashJoinSplits(txTo) if len(txTo.shieldedSpends) > 0: hashShieldedSpends = getHashShieldedSpends(txTo) if len(txTo.shieldedOutputs) > 0: hashShieldedOutputs = getHashShieldedOutputs(txTo) digest = blake2b( digest_size=32, person=b'ZcashSigHash' + struct.pack('= len(txtmp.vout): raise ValueError("outIdx %d out of range (%d)" % (outIdx, len(txtmp.vout))) tmp = txtmp.vout[outIdx] txtmp.vout = [] for i in range(outIdx): txtmp.vout.append(CTxOut()) txtmp.vout.append(tmp) for i in range(len(txtmp.vin)): if i != inIdx: txtmp.vin[i].nSequence = 0 if hashtype & SIGHASH_ANYONECANPAY: tmp = txtmp.vin[inIdx] txtmp.vin = [] txtmp.vin.append(tmp) s = txtmp.serialize() s += struct.pack(b"