#!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin Unlimited developers import asyncio from test_framework.environment import testing_websocket from test_framework.util import assert_raises_async, wait_for from test_framework.electrumutil import ( ElectrumTestFramework, ) from test_framework.connectrum.exc import ElectrumErrorResponse from test_framework.electrumconnection import ElectrumConnection MAX_RPC_CONNECTIONS = 5 MAX_SCRIPTHASH_SUBSCRIPTIONS = 5 SCRIPTHASH_ALIAS_BYTES_LIMIT = 56 * 2 # two nexa addresses async def get_scripthash(cli, address): return await cli.call("blockchain.address.get_scripthash", address) class ElectrumDoSLimitTest(ElectrumTestFramework): def __init__(self): super().__init__() self.setup_clean_chain = True self.num_nodes = 1 max_args = [ f"-electrum.rawarg=--scripthash-subscription-limit={MAX_SCRIPTHASH_SUBSCRIPTIONS}", f"-electrum.rawarg=--scripthash-alias-bytes-limit={SCRIPTHASH_ALIAS_BYTES_LIMIT}", f"-electrum.rawarg=--rpc-max-connections={MAX_RPC_CONNECTIONS}", ] self.extra_args = [max_args] async def run_test(self): n = self.nodes[0] n.generate(1) await self.test_connection_limit() await self.test_subscribe_limit(n) await self.test_scripthash_alias_limit(n) async def test_subscribe_limit(self, n): cli = ElectrumConnection() await cli.connect() self.info("Testing scripthash subscription limit.") # Subscribe up to limit scripthashes = [] for _ in range(0, MAX_SCRIPTHASH_SUBSCRIPTIONS): s = await get_scripthash(cli, n.getnewaddress()) await cli.subscribe("blockchain.scripthash.subscribe", s) scripthashes.append(s) # Next subscription should fail s = await get_scripthash(cli, n.getnewaddress()) await assert_raises_async( ElectrumErrorResponse, cli.call, "blockchain.scripthash.subscribe", s ) try: await cli.call("blockchain.scripthash.subscribe", s) except ElectrumErrorResponse as e: error_code = "-32600" assert error_code in str(e) assert "subscriptions limit reached" in str(e) # Subscribing to an existing subscription should not affect the limit. await cli.subscribe("blockchain.scripthash.subscribe", scripthashes[0]) # Unsubscribing should allow for a new subscription ok = await cli.call("blockchain.scripthash.unsubscribe", scripthashes[0]) assert ok await cli.subscribe("blockchain.scripthash.subscribe", s) # ... and also enforce the limit again await assert_raises_async( ElectrumErrorResponse, cli.call, "blockchain.scripthash.subscribe", await get_scripthash(cli, n.getnewaddress()), ) cli.disconnect() await wait_for(10, lambda: not cli.is_connected()) async def test_scripthash_alias_limit(self, n): cli = ElectrumConnection() await cli.connect() addresses = [ "bitcoincash:ppwk8u8cg8cthr3jg0czzays6hsnysykes9amw07kv", "bitcoincash:qrsrvtc95gg8rrag7dge3jlnfs4j9pe0ugrmeml950", ] # Alias limit allows to subscribe to two addresses. for a in addresses: await cli.subscribe("blockchain.address.subscribe", a) # Third address should fail third = n.getnewaddress() await assert_raises_async( ElectrumErrorResponse, cli.call, "blockchain.address.subscribe", third ) try: await cli.call("blockchain.address.subscribe", third) except ElectrumErrorResponse as e: error_code = "-32600" assert error_code in str(e) assert "alias subscriptions limit reached" in str(e) # Unsubscribing should allow for a new subscription ok = await cli.call("blockchain.address.unsubscribe", addresses[0]) assert ok await cli.subscribe("blockchain.address.subscribe", third) # ... and also enforce the limit again await assert_raises_async( ElectrumErrorResponse, cli.call, "blockchain.address.subscribe", n.getnewaddress(), ) cli.disconnect() await wait_for(10, lambda: not cli.is_connected()) async def test_connection_limit(self): connections = [] for _ in range(MAX_RPC_CONNECTIONS): c = ElectrumConnection() await c.connect() connections.append(c) # Exceed limit, we should get disconnected. if testing_websocket(): try: extra_connection = ElectrumConnection(warn_on_connection_loss=False) await extra_connection.connect() except Exception as e: assert "Failed to connect" in str(e), f"got error: {e}" else: extra_connection = ElectrumConnection(warn_on_connection_loss=False) await extra_connection.connect() try: await asyncio.wait_for(extra_connection.call("server.ping"), timeout=10) assert False except asyncio.TimeoutError: # We expect this to timeout pass await wait_for(10, lambda: not extra_connection.is_connected()) # Drop one connection connections[0].disconnect() await wait_for(10, lambda: not connections[0].is_connected()) # New connection should be accepted now. attempts = 1 while True: # In gitlab CI the connection sporadically fails for unknown reason. Allow a few attempts. try: extra_connection2 = ElectrumConnection() await extra_connection2.connect() await asyncio.wait_for( extra_connection2.call("server.ping"), timeout=10 ) break except Exception as e: print(f"Attempt {attempts} to connect failed: {e}") await asyncio.sleep(1.0) if attempts >= 5: raise e attempts += 1 finally: try: extra_connection2.disconnect() await wait_for(10, lambda: not extra_connection2.is_connected()) except Exception as e: print(f"Failed to disconnect 'extra_connection2: {e}") to_disconnect = connections[1:] for c in to_disconnect: c.disconnect() await asyncio.gather( *(wait_for(10, lambda d=d: not d.is_connected()) for d in to_disconnect) ) if __name__ == "__main__": asyncio.run(ElectrumDoSLimitTest().main())