# # Client connect to an Electrum server. # # Runtime check for optional modules from collections import defaultdict import socket from threading import Lock import traceback import asyncio import logging from typing import Optional, Union, Tuple, List, Dict from asyncio import AbstractEventLoop from .wsprotocol import WSProtocol from .protocol import StratumProtocol from .exc import ElectrumErrorResponse logger = logging.getLogger(__name__) # pylint: disable=too-many-instance-attributes class StratumClient: warn_on_connection_loss: bool protocol: Optional[Union[StratumProtocol, WSProtocol]] lock: Lock next_id: int inflight: Dict[str, Tuple[str, asyncio.Future]] subscriptions: Dict[str, List[asyncio.Queue]] ka_task: Optional[asyncio.Task] loop: AbstractEventLoop def __init__( self, loop: Optional[AbstractEventLoop] = None, warn_on_connection_loss=True ): self.warn_on_connection_loss = warn_on_connection_loss self.protocol = None self.lock = Lock() self.next_id = 1 self.inflight = defaultdict(tuple) self.subscriptions = defaultdict(list) self.ka_task = None self.loop = loop or asyncio.get_event_loop() def connection_lost(self, _protocol): self.protocol = None if self.warn_on_connection_loss: logger.warning( "Electrum server connection lost. Traceback: %s", traceback.format_exc() ) # cleanup keep alive task if self.ka_task: self.ka_task.cancel() self.ka_task = None def close(self): if self.protocol: self.protocol.close() self.protocol = None if self.ka_task: self.ka_task.cancel() self.ka_task = None async def connect(self, server_info, proto_code=None): """ Start connection process. Destination must be specified in a ServerInfo() record (first arg). """ assert not self.protocol, "already connected" if not proto_code: proto_code, *_ = server_info.protocols logger.debug("Connecting to: %r", server_info) hostname, port, use_ssl = server_info.get_port(proto_code) if proto_code == "g": # websocket protocol = WSProtocol(f"ws://{hostname}:{port}", self, self.loop) await protocol.connect() self.protocol = protocol else: # tcp # manually create socket for 'SO_REUSEADDR' to mitigate '[Errno 99] Cannot assign requested address' in CI sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(False) await self.loop.sock_connect(sock, (hostname, port)) _transport, protocol = await self.loop.create_connection( StratumProtocol, sock=sock, ssl=use_ssl ) self.protocol = protocol protocol.client = self self.ka_task = self.loop.create_task(self._keepalive()) logger.debug("Connected to: %r", server_info) return async def _keepalive(self): """ Keep our connect to server alive forever, with some pointless traffic. """ while self.protocol: await self.RPC("server.ping") await asyncio.sleep(10) def _send_request(self, method, params=None, is_subscribe=False): with self.lock: return self._send_request_inner(method, params, is_subscribe) def _send_request_inner(self, method, params=None, is_subscribe=False): """ Send a new request to the server. Serialized the JSON and tracks id numbers and optional callbacks. """ if params is None: params = [] # pick a new ID self.next_id += 1 req_id = self.next_id # serialize as JSON msg = {"id": req_id, "method": method, "params": params} # subscriptions are a Q, normal requests are a future if is_subscribe: wait_q = asyncio.Queue() self.subscriptions[method].append(wait_q) fut = asyncio.Future(loop=self.loop) self.inflight[req_id] = (msg, fut) assert self.protocol # send request immediatedly, response is a future self.protocol.send_data(msg) return fut if not is_subscribe else (fut, wait_q) def _got_response(self, msg): with self.lock: return self._got_response_inner(msg) def _got_response_inner(self, msg): """ Decode and dispatch responses from the server. Has already been unframed and deserialized into an object. """ # logger.debug("MSG: %r" % msg) resp_id = msg.get("id", None) if resp_id is None: # subscription traffic comes with method set, but no req id. method = msg.get("method", None) if not method: logger.error( "Incoming server message had no ID nor method in it %s", msg ) return # not obvious, but result is on params, not result, for # subscriptions result = msg.get("params", None) logger.debug("Traffic on subscription: %s", method) subs = self.subscriptions.get(method) for queue in subs: self.loop.create_task(queue.put(result)) return assert "method" not in msg result = msg.get("result") # fetch and forget about the request inf = self.inflight.pop(resp_id) if not inf: logger.error("Incoming server message had unknown ID in it: %s", resp_id) return # it's a future which is done now req, retval = inf if retval.done() or retval.cancelled(): logger.info("Future already done for resp_id %s: %s", resp_id, msg) return if "error" in msg: err = msg["error"] logger.info("Error response: '%s'", err) retval.set_exception(ElectrumErrorResponse(err, req)) else: retval.set_result(result) # pylint: disable=invalid-name def RPC(self, method: str, *params): """ Perform a remote command. Expects a method name, which look like: blockchain.address.get_balance .. and sometimes take arguments, all of which are positional. Returns a future which will you should await for the result from the server. Failures are returned as exceptions. """ assert "." in method # assert not method.endswith('subscribe') return self._send_request(method, params) def subscribe(self, method: str, *params): """ Perform a remote command which will stream events/data to us. Expects a method name, which look like: server.peers.subscribe .. and sometimes take arguments, all of which are positional. Returns a tuple: (Future, asyncio.Queue). The future will have the result of the initial call, and the queue will receive additional responses as they happen. """ assert "." in method assert method.endswith("subscribe") return self._send_request(method, params, is_subscribe=True)