import asyncio import json from typing import Any, Optional try: import websockets # pylint: disable=no-member WebSocketClientProtocol = websockets.WebSocketClientProtocol except ImportError: WEBSOCKETS_AVAILABLE = False # pylint: disable=too-few-public-methods class WebSocketClientProtocol: """Placeholder for when websockets module isn't available.""" def _raise_import_error(*args, **kwargs): raise ImportError("websockets module is not installed.") WebSocketClientProtocol.send = _raise_import_error WebSocketClientProtocol.close = _raise_import_error else: WEBSOCKETS_AVAILABLE = True # pylint: disable=no-member,protected-access class WSProtocol: closed: bool = True client: Any = None # circluar reference, cant define type ws: Optional[WebSocketClientProtocol] = None url: str loop: asyncio.AbstractEventLoop connection_lost_sent: bool messange_handler: Optional[asyncio.Task] = None def __init__(self, url: str, client, loop: asyncio.AbstractEventLoop): if not WEBSOCKETS_AVAILABLE: raise ImportError("websockets module is not installed.") self.client = client self.url = url self.loop = loop self.connection_lost_sent = False async def connect(self): print(f"Connecting to {self.url}") try: self.ws = await websockets.connect(self.url) self.closed = False self.messange_handler = self.loop.create_task(self.handle_messages()) except Exception as e: print(f"WebSocket Error: {e}") self.closed = True raise e async def handle_messages(self): try: async for msg in self.ws: print("RECEIVED", msg) messages = [m for m in msg.split("\n") if m] for message in messages: try: decoded_msg = json.loads(message) self.client._got_response(decoded_msg) except ValueError as exc: print("Bad JSON received from server:", message, exc) raise except Exception as e: print(f"WebSocket Error: {e}") self.closed = True self.connection_lost_sent = True if self.client: self.client.connection_lost(e) def send_data(self, data): msg = json.dumps(data) print("SENDING", msg) async def send(): try: await self.ws.send(msg + "\n") except Exception as e: print(f"websocket: closing due to failure to send {msg}: {e}") self.close() self.loop.create_task(send()) def close(self): if not self.closed: asyncio.create_task(self.ws.close()) self.closed = True if self.messange_handler: self.messange_handler.cancel() self.messange_handler = None if not self.connection_lost_sent: self.connection_lost_sent = True if self.client: self.client.connection_lost(None)