#!/usr/bin/env python3 import sys from argparse import ( ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, Namespace, _SubParsersAction, ) from functools import wraps from pathlib import Path from typing import Any, Optional, Tuple import grpc import netaddr from google.protobuf.json_format import MessageToJson from netaddr import EUI, AddrFormatError, IPNetwork from core.api.grpc.client import CoreGrpcClient from core.api.grpc.core_pb2 import ( Geo, Interface, LinkOptions, Node, NodeType, Position, SessionState, ) NODE_TYPES = [k for k, v in NodeType.Enum.items() if v != NodeType.PEER_TO_PEER] def coreclient(func): @wraps(func) def wrapper(*args, **kwargs): core = CoreGrpcClient() try: with core.context_connect(): return func(core, *args, **kwargs) except grpc.RpcError as e: print(f"grpc error: {e.details()}") return wrapper def mac_type(value: str) -> str: try: mac = EUI(value, dialect=netaddr.mac_unix_expanded) return str(mac) except AddrFormatError: raise ArgumentTypeError(f"invalid mac address: {value}") def ip4_type(value: str) -> IPNetwork: try: ip = IPNetwork(value) if not netaddr.valid_ipv4(str(ip.ip)): raise ArgumentTypeError(f"invalid ip4 address: {value}") return ip except AddrFormatError: raise ArgumentTypeError(f"invalid ip4 address: {value}") def ip6_type(value: str) -> IPNetwork: try: ip = IPNetwork(value) if not netaddr.valid_ipv6(str(ip.ip)): raise ArgumentTypeError(f"invalid ip6 address: {value}") return ip except AddrFormatError: raise ArgumentTypeError(f"invalid ip6 address: {value}") def position_type(value: str) -> Tuple[float, float]: error = "invalid position, must be in the format: float,float" try: values = [float(x) for x in value.split(",")] except ValueError: raise ArgumentTypeError(error) if len(values) != 2: raise ArgumentTypeError(error) x, y = values return x, y def geo_type(value: str) -> Tuple[float, float, float]: error = "invalid geo, must be in the format: float,float,float" try: values = [float(x) for x in value.split(",")] except ValueError: raise ArgumentTypeError(error) if len(values) != 3: raise ArgumentTypeError(error) lon, lat, alt = values return lon, lat, alt def file_type(value: str) -> str: path = Path(value) if not path.is_file(): raise ArgumentTypeError(f"invalid file: {value}") return str(path.absolute()) def get_current_session(core: CoreGrpcClient, session_id: Optional[int]) -> int: if session_id: return session_id response = core.get_sessions() if not response.sessions: print("no current session to interact with") sys.exit(1) return response.sessions[0].id def create_iface(iface_id: int, mac: str, ip4_net: IPNetwork, ip6_net: IPNetwork) -> Interface: ip4 = str(ip4_net.ip) if ip4_net else None ip4_mask = ip4_net.prefixlen if ip4_net else None ip6 = str(ip6_net.ip) if ip6_net else None ip6_mask = ip6_net.prefixlen if ip6_net else None return Interface( id=iface_id, mac=mac, ip4=ip4, ip4_mask=ip4_mask, ip6=ip6, ip6_mask=ip6_mask, ) def print_iface_header() -> None: print("ID | MAC Address | IP4 Address | IP6 Address") def print_iface(iface: Interface) -> None: iface_ip4 = f"{iface.ip4}/{iface.ip4_mask}" if iface.ip4 else "" iface_ip6 = f"{iface.ip6}/{iface.ip6_mask}" if iface.ip6 else "" print(f"{iface.id:<3} | {iface.mac:<17} | {iface_ip4:<18} | {iface_ip6}") def print_json(message: Any) -> None: json = MessageToJson(message, preserving_proto_field_name=True) print(json) @coreclient def get_wlan_config(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) response = core.get_wlan_config(session_id, args.node) if args.json: print_json(response) else: size = 0 for option in response.config.values(): size = max(size, len(option.name)) print(f"{'Name':<{size}.{size}} | Value") for option in response.config.values(): print(f"{option.name:<{size}.{size}} | {option.value}") @coreclient def set_wlan_config(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) config = {} if args.bandwidth: config["bandwidth"] = str(args.bandwidth) if args.delay: config["delay"] = str(args.delay) if args.loss: config["error"] = str(args.loss) if args.jitter: config["jitter"] = str(args.jitter) if args.range: config["range"] = str(args.range) response = core.set_wlan_config(session_id, args.node, config) if args.json: print_json(response) else: print(f"set wlan config: {response.result}") @coreclient def open_xml(core: CoreGrpcClient, args: Namespace) -> None: response = core.open_xml(args.file, args.start) if args.json: print_json(response) else: print(f"opened xml: {response.result}") @coreclient def query_sessions(core: CoreGrpcClient, args: Namespace) -> None: response = core.get_sessions() if args.json: print_json(response) else: print("Session ID | Session State | Nodes") for s in response.sessions: state = SessionState.Enum.Name(s.state) print(f"{s.id:<10} | {state:<13} | {s.nodes}") @coreclient def query_session(core: CoreGrpcClient, args: Namespace) -> None: response = core.get_session(args.id) if args.json: print_json(response) else: print("Nodes") print("Node ID | Node Name | Node Type") names = {} for node in response.session.nodes: names[node.id] = node.name node_type = NodeType.Enum.Name(node.type) print(f"{node.id:<7} | {node.name:<9} | {node_type}") print("\nLinks") for link in response.session.links: n1 = names[link.node1_id] n2 = names[link.node2_id] print(f"Node | ", end="") print_iface_header() print(f"{n1:<6} | ", end="") if link.HasField("iface1"): print_iface(link.iface1) else: print() print(f"{n2:<6} | ", end="") if link.HasField("iface2"): print_iface(link.iface2) else: print() print() @coreclient def query_node(core: CoreGrpcClient, args: Namespace) -> None: names = {} response = core.get_session(args.id) for node in response.session.nodes: names[node.id] = node.name response = core.get_node(args.id, args.node) if args.json: print_json(response) else: node = response.node node_type = NodeType.Enum.Name(node.type) print("ID | Name | Type") print(f"{node.id:<4} | {node.name:<7} | {node_type}") print("Interfaces") print("Connected To | ", end="") print_iface_header() for iface in response.ifaces: if iface.net_id == node.id: if iface.node_id: name = names[iface.node_id] else: name = names[iface.net2_id] else: name = names.get(iface.net_id, "") print(f"{name:<12} | ", end="") print_iface(iface) @coreclient def delete_session(core: CoreGrpcClient, args: Namespace) -> None: response = core.delete_session(args.id) if args.json: print_json(response) else: print(f"delete session({args.id}): {response.result}") @coreclient def add_node(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) node_type = NodeType.Enum.Value(args.type) pos = None if args.pos: x, y = args.pos pos = Position(x=x, y=y) geo = None if args.geo: lon, lat, alt = args.geo geo = Geo(lon=lon, lat=lat, alt=alt) node = Node( id=args.id, name=args.name, type=node_type, model=args.model, emane=args.emane, icon=args.icon, image=args.image, position=pos, geo=geo, ) response = core.add_node(session_id, node) if args.json: print_json(response) else: print(f"created node: {response.node_id}") @coreclient def edit_node(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) pos = None if args.pos: x, y = args.pos pos = Position(x=x, y=y) geo = None if args.geo: lon, lat, alt = args.geo geo = Geo(lon=lon, lat=lat, alt=alt) response = core.edit_node(session_id, args.id, pos, args.icon, geo) if args.json: print_json(response) else: print(f"edit node: {response.result}") @coreclient def delete_node(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) response = core.delete_node(session_id, args.id) if args.json: print_json(response) else: print(f"deleted node: {response.result}") @coreclient def add_link(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) iface1 = None if args.iface1_id is not None: iface1 = create_iface(args.iface1_id, args.iface1_mac, args.iface1_ip4, args.iface1_ip6) iface2 = None if args.iface2_id is not None: iface2 = create_iface(args.iface2_id, args.iface2_mac, args.iface2_ip4, args.iface2_ip6) options = LinkOptions( bandwidth=args.bandwidth, loss=args.loss, jitter=args.jitter, delay=args.delay, dup=args.duplicate, unidirectional=args.uni, ) response = core.add_link(session_id, args.node1, args.node2, iface1, iface2, options) if args.json: print_json(response) else: print(f"add link: {response.result}") @coreclient def edit_link(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) options = LinkOptions( bandwidth=args.bandwidth, loss=args.loss, jitter=args.jitter, delay=args.delay, dup=args.duplicate, unidirectional=args.uni, ) response = core.edit_link( session_id, args.node1, args.node2, options, args.iface1, args.iface2 ) if args.json: print_json(response) else: print(f"edit link: {response.result}") @coreclient def delete_link(core: CoreGrpcClient, args: Namespace) -> None: session_id = get_current_session(core, args.session) response = core.delete_link(session_id, args.node1, args.node2, args.iface1, args.iface2) if args.json: print_json(response) else: print(f"delete link: {response.result}") def setup_sessions_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("session", help="session interactions") parser.formatter_class = ArgumentDefaultsHelpFormatter parser.add_argument("-i", "--id", type=int, help="session id to use", required=True) subparsers = parser.add_subparsers(help="session commands") subparsers.required = True subparsers.dest = "command" delete_parser = subparsers.add_parser("delete", help="delete a session") delete_parser.formatter_class = ArgumentDefaultsHelpFormatter delete_parser.set_defaults(func=delete_session) def setup_node_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("node", help="node interactions") parser.formatter_class = ArgumentDefaultsHelpFormatter parser.add_argument("-s", "--session", type=int, help="session to interact with") subparsers = parser.add_subparsers(help="node commands") subparsers.required = True subparsers.dest = "command" add_parser = subparsers.add_parser("add", help="add a node") add_parser.formatter_class = ArgumentDefaultsHelpFormatter add_parser.add_argument("-i", "--id", type=int, help="id to use, optional") add_parser.add_argument("-n", "--name", help="name to use, optional") add_parser.add_argument( "-t", "--type", choices=NODE_TYPES, default="DEFAULT", help="type of node" ) add_parser.add_argument("-m", "--model", help="used to determine services, optional") group = add_parser.add_mutually_exclusive_group(required=True) group.add_argument("-p", "--pos", type=position_type, help="x,y position") group.add_argument("-g", "--geo", type=geo_type, help="lon,lat,alt position") add_parser.add_argument("-ic", "--icon", help="icon to use, optional") add_parser.add_argument("-im", "--image", help="container image, optional") add_parser.add_argument("-e", "--emane", help="emane model, only required for emane nodes") add_parser.set_defaults(func=add_node) edit_parser = subparsers.add_parser("edit", help="edit a node") edit_parser.formatter_class = ArgumentDefaultsHelpFormatter edit_parser.add_argument("-i", "--id", type=int, help="id to use, optional") group = edit_parser.add_mutually_exclusive_group(required=True) group.add_argument("-p", "--pos", type=position_type, help="x,y position") group.add_argument("-g", "--geo", type=geo_type, help="lon,lat,alt position") edit_parser.add_argument("-ic", "--icon", help="icon to use, optional") edit_parser.set_defaults(func=edit_node) delete_parser = subparsers.add_parser("delete", help="delete a node") delete_parser.formatter_class = ArgumentDefaultsHelpFormatter delete_parser.add_argument("-i", "--id", type=int, help="node id", required=True) delete_parser.set_defaults(func=delete_node) def setup_link_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("link", help="link interactions") parser.formatter_class = ArgumentDefaultsHelpFormatter parser.add_argument("-s", "--session", type=int, help="session to interact with") subparsers = parser.add_subparsers(help="link commands") subparsers.required = True subparsers.dest = "command" add_parser = subparsers.add_parser("add", help="add a node") add_parser.formatter_class = ArgumentDefaultsHelpFormatter add_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True) add_parser.add_argument("-n2", "--node2", type=int, help="node2 id", required=True) add_parser.add_argument("-i1-i", "--iface1-id", type=int, help="node1 interface id") add_parser.add_argument("-i1-m", "--iface1-mac", type=mac_type, help="node1 interface mac") add_parser.add_argument("-i1-4", "--iface1-ip4", type=ip4_type, help="node1 interface ip4") add_parser.add_argument("-i1-6", "--iface1-ip6", type=ip6_type, help="node1 interface ip6") add_parser.add_argument("-i2-i", "--iface2-id", type=int, help="node2 interface id") add_parser.add_argument("-i2-m", "--iface2-mac", type=mac_type, help="node2 interface mac") add_parser.add_argument("-i2-4", "--iface2-ip4", type=ip4_type, help="node2 interface ip4") add_parser.add_argument("-i2-6", "--iface2-ip6", type=ip6_type, help="node2 interface ip6") add_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)") add_parser.add_argument("-l", "--loss", type=float, help="loss (%%)") add_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)") add_parser.add_argument("-de", "--delay", type=int, help="delay (us)") add_parser.add_argument("-du", "--duplicate", type=int, help="duplicate (%%)") add_parser.add_argument("-u", "--uni", action="store_true", help="is link unidirectional?") add_parser.set_defaults(func=add_link) edit_parser = subparsers.add_parser("edit", help="edit a link") edit_parser.formatter_class = ArgumentDefaultsHelpFormatter edit_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True) edit_parser.add_argument("-n2", "--node2", type=int, help="node2 id", required=True) edit_parser.add_argument("-i1", "--iface1", type=int, help="node1 interface id") edit_parser.add_argument("-i2", "--iface2", type=int, help="node2 interface id") edit_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)") edit_parser.add_argument("-l", "--loss", type=float, help="loss (%%)") edit_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)") edit_parser.add_argument("-de", "--delay", type=int, help="delay (us)") edit_parser.add_argument("-du", "--duplicate", type=int, help="duplicate (%%)") edit_parser.add_argument( "-u", "--uni", action="store_true", help="is link unidirectional?" ) edit_parser.set_defaults(func=edit_link) delete_parser = subparsers.add_parser("delete", help="delete a link") delete_parser.formatter_class = ArgumentDefaultsHelpFormatter delete_parser.add_argument("-n1", "--node1", type=int, help="node1 id", required=True) delete_parser.add_argument("-n2", "--node2", type=int, help="node1 id", required=True) delete_parser.add_argument("-i1", "--iface1", type=int, help="node1 interface id") delete_parser.add_argument("-i2", "--iface2", type=int, help="node2 interface id") delete_parser.set_defaults(func=delete_link) def setup_query_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("query", help="query interactions") subparsers = parser.add_subparsers(help="query commands") subparsers.required = True subparsers.dest = "command" sessions_parser = subparsers.add_parser("sessions", help="query current sessions") sessions_parser.formatter_class = ArgumentDefaultsHelpFormatter sessions_parser.set_defaults(func=query_sessions) session_parser = subparsers.add_parser("session", help="query session") session_parser.formatter_class = ArgumentDefaultsHelpFormatter session_parser.add_argument("-i", "--id", type=int, help="session to query", required=True) session_parser.set_defaults(func=query_session) node_parser = subparsers.add_parser("node", help="query node") node_parser.formatter_class = ArgumentDefaultsHelpFormatter node_parser.add_argument("-i", "--id", type=int, help="session to query", required=True) node_parser.add_argument("-n", "--node", type=int, help="node to query", required=True) node_parser.set_defaults(func=query_node) def setup_xml_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("xml", help="open session xml") parser.formatter_class = ArgumentDefaultsHelpFormatter parser.add_argument("-f", "--file", type=file_type, help="xml file to open", required=True) parser.add_argument("-s", "--start", action="store_true", help="start the session?") parser.set_defaults(func=open_xml) def setup_wlan_parser(parent: _SubParsersAction) -> None: parser = parent.add_parser("wlan", help="wlan specific interactions") parser.formatter_class = ArgumentDefaultsHelpFormatter parser.add_argument("-s", "--session", type=int, help="session to interact with") subparsers = parser.add_subparsers(help="link commands") subparsers.required = True subparsers.dest = "command" get_parser = subparsers.add_parser("get", help="get wlan configuration") get_parser.formatter_class = ArgumentDefaultsHelpFormatter get_parser.add_argument("-n", "--node", type=int, help="wlan node", required=True) get_parser.set_defaults(func=get_wlan_config) set_parser = subparsers.add_parser("set", help="set wlan configuration") set_parser.formatter_class = ArgumentDefaultsHelpFormatter set_parser.add_argument("-n", "--node", type=int, help="wlan node", required=True) set_parser.add_argument("-b", "--bandwidth", type=int, help="bandwidth (bps)") set_parser.add_argument("-d", "--delay", type=int, help="delay (us)") set_parser.add_argument("-l", "--loss", type=float, help="loss (%%)") set_parser.add_argument("-j", "--jitter", type=int, help="jitter (us)") set_parser.add_argument("-r", "--range", type=int, help="range (pixels)") set_parser.set_defaults(func=set_wlan_config) def main() -> None: parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument( "-js", "--json", action="store_true", help="print responses to terminal as json" ) subparsers = parser.add_subparsers(help="supported commands") subparsers.required = True subparsers.dest = "command" setup_sessions_parser(subparsers) setup_node_parser(subparsers) setup_link_parser(subparsers) setup_query_parser(subparsers) setup_xml_parser(subparsers) setup_wlan_parser(subparsers) args = parser.parse_args() args.func(args) if __name__ == "__main__": main()