#!/usr/bin/env python3 # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import inspect import json import random import readline import socket import sys import time import requests import struct import asyncio from concurrent.futures import ThreadPoolExecutor from pica import Host from pica.packets import uci MAX_DATA_PACKET_PAYLOAD_SIZE = 1024 def encode_short_mac_address(mac_address: str) -> bytes: return int(mac_address).to_bytes(2, byteorder="little") def encode_mac_address(mac_address: str) -> bytes: return int(mac_address).to_bytes(8, byteorder="little") def parse_mac_address(mac_address: str) -> bytes: bs = mac_address.split(":") return bytes(int(b, 16) for b in bs) class Device: def __init__(self, reader, writer, http_address): self.host = Host(reader, writer, bytes([0, 1])) self.http_address = http_address def pica_get_state(self, **kargs): """List the UCI devices and anchors currently existing within the Pica virtual environment""" r = requests.get(f"{self.http_address}/get-state") print(f"{r.status_code}:\n{json.dumps(r.json(), indent=2)}") def pica_init_uci_device( self, mac_address: str = "00:00", x: str = "0", y: str = "0", z: str = "0", yaw: str = "0", pitch: str = "0", roll: str = "0", **kargs, ): """Init Pica device""" r = requests.post( f"{self.http_address}/init-uci-device/{mac_address}", data=json.dumps( { "x": int(x), "y": int(y), "z": int(z), "yaw": int(yaw), "pitch": int(pitch), "roll": int(roll), } ), ) print(f"{r.status_code}: {r.text}") def pica_create_anchor( self, mac_address: str = "00:00", x: str = "0", y: str = "0", z: str = "0", yaw: str = "0", pitch: str = "0", roll: str = "0", **kargs, ): """Create a Pica anchor""" r = requests.post( f"{self.http_address}/create-anchor/{mac_address}", data=json.dumps( { "x": int(x), "y": int(y), "z": int(z), "yaw": int(yaw), "pitch": int(pitch), "roll": int(roll), } ), ) print(f"{r.status_code}: {r.text}") def pica_destroy_anchor(self, mac_address: str = "00:00", **kargs): """Destroy a Pica anchor""" r = requests.post(f"{self.http_address}/destroy-anchor/{mac_address}") print(f"{r.status_code}: {r.text}") def pica_set_position( self, mac_address: str = "00:00", x: str = "0", y: str = "0", z: str = "0", yaw: str = "0", pitch: str = "0", roll: str = "0", **kargs, ): """Set Pica UCI device or anchor position""" r = requests.post( f"{self.http_address}/set-position/{mac_address}", data=json.dumps( { "x": int(x), "y": int(y), "z": int(z), "yaw": int(yaw), "pitch": int(pitch), "roll": int(roll), } ), ) print(f"{r.status_code}: {r.text}") def device_reset(self, **kargs): """Reset the UWBS.""" self.host.send_control( uci.DeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET) ) def get_device_info(self, **kargs): """Retrieve the device information like (UCI version and other vendor specific info).""" self.host.send_control(uci.GetDeviceInfoCmd()) def get_caps_info(self, **kargs): """Get the capability of the UWBS.""" self.host.send_control(uci.GetCapsInfoCmd()) def set_config(self, low_power_mode: str = "0", **kargs): """Set the configuration parameters on the UWBS.""" self.host.send_control( uci.SetConfigCmd( tlvs=[ uci.DeviceConfigTlv( cfg_id=uci.DeviceConfigId.LOW_POWER_MODE, v=bytes([int(low_power_mode)]), ), ] ) ) def get_config(self, **kargs): """Retrieve the current configuration parameter(s) of the UWBS.""" self.host.send_control( uci.GetConfigCmd( cfg_id=[ uci.DeviceConfigId.LOW_POWER_MODE, uci.DeviceConfigId.DEVICE_STATE, ] ) ) def session_init(self, session_id: str = "0", **kargs): """Initialize the session""" self.host.send_control( uci.SessionInitCmd( session_id=int(session_id), session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION, ) ) def session_deinit(self, session_id: str = "0", **kargs): """Deinitialize the session""" self.host.send_control(uci.SessionDeinitCmd(session_token=int(session_id))) def session_set_app_config( self, session_id: str = "0", ranging_interval: str = "200", dst_mac_addresses: str = "", **kargs, ): """set APP Configuration Parameters for the requested UWB session.""" dst_mac_addresses = [ parse_mac_address(a) for a in dst_mac_addresses.split(",") if a ] if any(len(a) > 2 for a in dst_mac_addresses): mac_address_mode = 0x2 mac_address_len = 8 else: mac_address_mode = 0x0 mac_address_len = 2 encoded_dst_mac_addresses = bytes() for mac_address in dst_mac_addresses: encoded_dst_mac_addresses += mac_address encoded_dst_mac_addresses += b"\0" * (mac_address_len - len(mac_address)) self.host.send_control( uci.SessionSetAppConfigCmd( session_token=int(session_id), tlvs=[ uci.AppConfigTlv( cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE, v=bytes([mac_address_mode]), ), uci.AppConfigTlv( cfg_id=uci.AppConfigTlvType.RANGING_DURATION, v=int(ranging_interval).to_bytes(4, byteorder="little"), ), uci.AppConfigTlv( cfg_id=uci.AppConfigTlvType.NO_OF_CONTROLEE, v=bytes([len(dst_mac_addresses)]), ), uci.AppConfigTlv( cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS, v=encoded_dst_mac_addresses, ), ], ) ) def session_get_app_config(self, session_id: str = "0", **kargs): """retrieve the current APP Configuration Parameters of the requested UWB session.""" self.host.send_control( uci.SessionGetAppConfigCmd(session_token=int(session_id), app_cfg=[0x9]) ) def session_get_count(self, **kargs): """Retrieve number of UWB sessions in the UWBS.""" self.host.send_control(uci.SessionGetCountCmd()) def session_get_state(self, session_id: str = "0", **kargs): """Query the current state of the UWB session.""" self.host.send_control(uci.SessionGetStateCmd(session_token=int(session_id))) def session_update_controller_multicast_list( self, session_id: str = "0", action: str = "add", mac_address: str = "0", subsession_id: str = "0", **kargs, ): """Update the controller multicast list.""" if action == "add": encoded_action = uci.UpdateMulticastListAction.ADD_CONTROLEE elif action == "remove": encoded_action = uci.UpdateMulticastListAction.REMOVE_CONTROLEE else: print(f"Unexpected action: '{action}', expected add or remove") return self.host.send_control( uci.SessionUpdateControllerMulticastListCmd( session_token=int(session_id), action=encoded_action, payload=uci.SessionUpdateControllerMulticastListCmdPayload( controlees=[ uci.Controlee( short_address=encode_short_mac_address(mac_address), subsession_id=int(subsession_id), ) ], ).serialize(), ) ) def range_start(self, session_id: str = "0", **kargs): """start a UWB session.""" self.host.send_control(uci.SessionStartCmd(session_id=int(session_id))) def range_stop(self, session_id: str = "0", **kargs): """Stop a UWB session.""" self.host.send_control(uci.SessionStopCmd(session_id=int(session_id))) def get_ranging_count(self, session_id: str = "0", **kargs): """Get the number of times ranging has been attempted during the ranging session..""" self.host.send_control( uci.SessionGetRangingCountCmd(session_id=int(session_id)) ) def data_transfer( self, dst_mac_address, file_name, session_id: str = "0", ): """Initiates data transfer by sending (possibly segmented) UCI data packet(s).""" # Does not have flow control, i.e. waiting for data credit notifications in between sending packets try: with open(file_name, "rb") as f: b = f.read() seq_num = 0 dst_mac_address = parse_mac_address(dst_mac_address) if len(b) > MAX_DATA_PACKET_PAYLOAD_SIZE: for i in range(0, len(b), MAX_DATA_PACKET_PAYLOAD_SIZE): section = b[i : i + MAX_DATA_PACKET_PAYLOAD_SIZE] if i + MAX_DATA_PACKET_PAYLOAD_SIZE >= len(b): self.host.send_data( uci.DataMessageSnd( session_handle=int(session_id), destination_address=int.from_bytes(dst_mac_address), data_sequence_number=seq_num, application_data=section, ) ) else: self.host.send_data( uci.DataMessageSnd( session_handle=int(session_id), pbf=uci.PacketBoundaryFlag.NOT_COMPLETE, destination_address=int.from_bytes(dst_mac_address), data_sequence_number=seq_num, application_data=section, ) ) seq_num += 1 if seq_num >= 65535: seq_num = 0 else: self.host.send_data( uci.DataMessageSnd( session_handle=int(session_id), destination_address=int.from_bytes(dst_mac_address), data_sequence_number=seq_num, application_data=b, ) ) except Exception as e: print(e) async def read_responses_and_notifications(self): def chunks(l, n): for i in range(0, len(l), n): yield l[i : i + n] while True: packet = await self.host._recv_control() # Format and print raw response data txt = "\n ".join( [ " ".join(["{:02x}".format(b) for b in shard]) for shard in chunks(packet, 16) ] ) command_buffer = readline.get_line_buffer() print("\r", end="") print(f"Received UCI packet [{len(packet)}]:") print(f" {txt}") try: uci_packet = uci.ControlPacket.parse_all(packet) uci_packet.show() except Exception as exn: pass print(f"--> {command_buffer}", end="", flush=True) async def ainput(prompt: str = ""): with ThreadPoolExecutor(1, "ainput") as executor: return ( await asyncio.get_event_loop().run_in_executor(executor, input, prompt) ).rstrip() async def get_stream_reader(pipe) -> asyncio.StreamReader: loop = asyncio.get_event_loop() reader = asyncio.StreamReader(loop=loop) protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: protocol, pipe) return reader async def command_line(device: Device): commands = { "pica_get_state": device.pica_get_state, "pica_init_uci_device": device.pica_init_uci_device, "pica_create_anchor": device.pica_create_anchor, "pica_destroy_anchor": device.pica_destroy_anchor, "pica_set_position": device.pica_set_position, "device_reset": device.device_reset, "get_device_info": device.get_device_info, "get_config": device.get_config, "set_config": device.set_config, "get_caps_info": device.get_caps_info, "session_init": device.session_init, "session_deinit": device.session_deinit, "session_set_app_config": device.session_set_app_config, "session_get_app_config": device.session_get_app_config, "session_get_count": device.session_get_count, "session_get_state": device.session_get_state, "session_update_controller_multicast_list": device.session_update_controller_multicast_list, "range_start": device.range_start, "range_stop": device.range_stop, "data_transfer": device.data_transfer, "get_ranging_count": device.get_ranging_count, } def usage(): for cmd, func in commands.items(): print(f" {cmd.ljust(32)}{func.__doc__}") def complete(text, state): tokens = readline.get_line_buffer().split() if not tokens or readline.get_line_buffer()[-1] == " ": tokens.append("") # Writing a command name, complete to ' ' if len(tokens) == 1: results = [cmd + " " for cmd in commands.keys() if cmd.startswith(text)] # Writing a keyword argument, no completion elif "=" in tokens[-1]: results = [] # Writing a keyword name, but unknown command, no completion elif tokens[0] not in commands: results = [] # Writing a keyword name, complete to '=' else: sig = inspect.signature(commands[tokens[0]]) names = [ name for (name, p) in sig.parameters.items() if ( p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD or p.kind == inspect.Parameter.KEYWORD_ONLY ) ] results = [name + "=" for name in names if name.startswith(tokens[-1])] results += [None] return results[state] # Configure readline readline.parse_and_bind("tab: complete") readline.set_completer(complete) while True: cmd = await ainput("--> ") [cmd, *params] = cmd.split(" ") args = [] kargs = dict() for param in params: if len(param) == 0: continue elif "=" in param: [key, value] = param.split("=") kargs[key] = value else: args.append(param) if cmd in ["quit", "q"]: break if cmd not in commands: print(f"Undefined command {cmd}") usage() continue commands[cmd](*args, **kargs) async def run(address: str, uci_port: int, http_port: int): try: # Connect to Pica reader, writer = await asyncio.open_connection(address, uci_port) except Exception as exn: print( f"Failed to connect to Pica server at address {address}:{uci_port}\n" + "Make sure the server is running" ) exit(1) # Start input and receive loops device = Device(reader, writer, f"http://{address}:{http_port}") loop = asyncio.get_event_loop() loop.create_task(device.read_responses_and_notifications()) await command_line(device) def main(): """Start a Pica interactive console.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--address", type=str, default="127.0.0.1", help="Select the pica server address", ) parser.add_argument( "--uci-port", type=int, default=7000, help="Select the pica TCP UCI port" ) parser.add_argument( "--http-port", type=int, default=3000, help="Select the pica HTTP port" ) asyncio.run(run(**vars(parser.parse_args()))) if __name__ == "__main__": main()