import asyncio import time from test_framework.portseed import electrum_ws_port from test_framework.portseed import electrum_rpc_port from test_framework.connectrum.client import StratumClient from test_framework.connectrum.svr_info import ServerInfo from test_framework.environment import testing_websocket class TestClient(StratumClient): is_connected = False def connection_lost(self, protocol): self.is_connected = False super().connection_lost(protocol) class ElectrumConnection: protocol = "tcp" def __init__(self, loop=None, *, protocol=None, warn_on_connection_loss=False): self.cli = TestClient(loop, warn_on_connection_loss) if protocol is not None: self.protocol = protocol else: if testing_websocket(): self.protocol = "ws" async def connect(self, node_index=0): connect_timeout = 120 start = time.time() while True: try: if self.protocol == "tcp": await self.cli.connect( ServerInfo( None, ip_addr="127.0.0.1", ports=await electrum_rpc_port(n=node_index), ) ) elif self.protocol == "ws": await self.cli.connect( ServerInfo( None, hostname="localhost", ip_addr="127.0.0.1", ports=f"g{await electrum_ws_port(n=node_index)}", ), proto_code="g", ) else: raise Exception(f"Unsupported protocol {self.protocol}") self.cli.is_connected = True break except Exception as e: if time.time() >= (start + connect_timeout): raise Exception( f"Failed to connect to electrum server within {connect_timeout} seconds. Error '{e}'" ) from e print(".", end="", flush=True) await asyncio.sleep(1) def disconnect(self): self.cli.close() async def call(self, method, *args): if not self.cli.is_connected: raise Exception("not connected") ok = await self.cli.RPC(method, *args) return ok async def subscribe(self, method, *args): if not self.cli.is_connected: raise Exception("not connected") future, queue = self.cli.subscribe(method, *args) result = await future return result, queue def is_connected(self): return self.cli.is_connected