#!/usr/bin/env python3
# SPDX-Copyright: Copyright (c) 2019 Daniel Edgecumbe (esotericnonsense)
# SPDX-License-Identifier: AGPL-3.0-only
#
# This file is part of botfair. botfair is free software: you can
# redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation, either version
# 3 of the License, or (at your option) any later version.
#
# botfair is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with botfair. If not, see .
from typing import Optional, List, Tuple
from xml.etree.ElementTree import Element, parse
from enum import Enum, auto
from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin
import string
@dataclass
class Value(DataClassJsonMixin):
"""A value as per BF API"""
name: str
_id: Optional[int]
description: Optional[str]
@dataclass
class Param(DataClassJsonMixin):
"""A parameter as per the BF API"""
name: str
_type: str
description: Optional[str]
mandatory: bool
values: Optional[List[Value]]
@dataclass
class SimpleResponse(DataClassJsonMixin):
"""A simple response"""
_type: str
description: Optional[str]
@dataclass
class ExceptionResponse(DataClassJsonMixin):
"""A simple exception response"""
_type: str
description: Optional[str]
@dataclass
class Operation(DataClassJsonMixin):
"""An operation as per the BF API"""
name: str
since: str
description: Optional[str]
params: List[Param]
simple_response: SimpleResponse
exceptions: List[ExceptionResponse]
@dataclass
class ExceptionType(DataClassJsonMixin):
"""An exception type as per the BF API"""
name: str
description: Optional[str]
prefix: str
params: List[Param]
@dataclass
class SimpleType(DataClassJsonMixin):
"""An exception type as per the BF API"""
name: str
_type: str
description: Optional[str]
values: Optional[List[Value]]
@dataclass
class DataType(DataClassJsonMixin):
"""An exception type as per the BF API"""
name: str
description: Optional[str]
params: List[Param]
@dataclass
class APING(DataClassJsonMixin):
"""Parsed Betfair APING"""
description: Optional[str]
operations: List[Operation]
data_types: List[DataType]
exception_types: List[ExceptionType]
simple_types: List[SimpleType]
class ParentType(Enum):
OPERATION = auto()
DATA_TYPE = auto()
EXCEPTION_TYPE = auto()
SIMPLE_TYPE = auto()
def parse_type(_type: str) -> str:
_type = _type.replace(" ", "") # sanity
allowable: str = string.ascii_letters + string.digits
if _type.startswith("list("):
inner_type = _type.replace(")", "", 1).split("(", maxsplit=1)[1]
assert all(x in allowable for x in inner_type)
_type = f"List[{inner_type}]"
elif _type.startswith("set("):
inner_type = _type.replace(")", "", 1).split("(", maxsplit=1)[1]
assert all(x in allowable for x in inner_type)
_type = f"Set[{inner_type}]"
elif _type.startswith("map("):
inner_type_1, inner_type_2 = (
_type.replace(")", "", 1)
.split("(", maxsplit=1)[1]
.split(",", maxsplit=1)
)
assert all(x in allowable for x in inner_type_1), inner_type_1
assert all(x in allowable for x in inner_type_2), inner_type_2
_type = f"Map[{inner_type_1}, {inner_type_2}]"
return _type
def strip_string(s: Optional[str]) -> Optional[str]:
if s is None:
return None
return " ".join([x.strip() for x in s.split("\n")]).strip()
def parse_description(el: Element) -> Optional[str]:
assert el.tag == "description"
assert not el.attrib
assert len(el) == 0
if el.text is None:
return None
s: Optional[str] = strip_string(el.text)
if s is None:
return None
if s == "":
return None
return s
def parse_exceptions(el: Element) -> List[ExceptionResponse]:
assert el.tag == "exceptions"
assert not el.attrib
assert not strip_string(el.text)
exceptions: List[ExceptionResponse] = []
for child in el: # type: Element
if child.tag == "exception":
_type = child.attrib.pop("type")
assert not child.attrib
assert not strip_string(child.text)
assert len(child) == 1
_type = parse_type(_type)
description = parse_description(child[0])
exceptions.append(
ExceptionResponse(_type=_type, description=description)
)
continue
raise NotImplementedError(child.tag)
return exceptions
def parse_value(el: Element, parent: ParentType) -> Value:
name = el.attrib.pop("name")
_id: Optional[int]
if parent == ParentType.EXCEPTION_TYPE:
_id_str: str = el.attrib.pop("id")
assert _id_str.isdigit()
_id = int(_id_str)
elif parent == ParentType.SIMPLE_TYPE:
_id = None
else:
raise NotImplementedError(f"value: {parent}")
assert not el.attrib, el.attrib
assert not strip_string(el.text)
assert len(el) == 1
description = parse_description(el[0])
return Value(name=name, _id=_id, description=description)
def parse_validValues(el: Element, parent: ParentType) -> List[Value]:
assert not el.attrib
assert not strip_string(el.text)
values: List[Value] = []
for child in el: # type: Element
if child.tag == "value":
values.append(parse_value(child, parent=parent))
continue
raise NotImplementedError(child.tag)
return values
def parse_parameter(el: Element, parent: ParentType) -> Param:
mandatory: bool
try:
mandatory_str = el.attrib.pop("mandatory")
if mandatory_str == "true":
mandatory = True
else:
assert mandatory_str == "false"
mandatory = False
except KeyError:
mandatory = False
name = el.attrib.pop("name")
_type = el.attrib.pop("type")
_type = parse_type(_type)
assert not el.attrib
assert not strip_string(el.text)
description: Optional[str] = None
values: Optional[List[Value]] = None
for child in el: # type: Element
if child.tag == "description":
assert description is None
description = parse_description(child)
continue
if parent == ParentType.EXCEPTION_TYPE and child.tag == "validValues":
values = parse_validValues(child, parent=ParentType.EXCEPTION_TYPE)
continue
raise NotImplementedError(child.tag)
return Param(
name=name,
_type=_type,
description=description,
mandatory=mandatory,
values=values,
)
def parse_request(el: Element) -> List[Param]:
assert el.tag == "request"
assert not el.attrib
assert not strip_string(el.text)
params: List[Param] = []
for child in el: # type: Element
if child.tag == "parameter":
param = parse_parameter(child, parent=ParentType.OPERATION)
params.append(param)
continue
raise NotImplementedError(child.tag)
return params
def parse_parameters(
el: Element
) -> Tuple[List[Param], SimpleResponse, List[ExceptionResponse]]:
assert el.tag == "parameters"
assert not el.attrib
assert not strip_string(el.text)
for child in el: # type: Element
if child.tag == "request":
params = parse_request(child)
continue
if child.tag == "simpleResponse":
_type = child.attrib.pop("type")
assert not child.attrib
assert not strip_string(child.text)
assert len(child) == 1
description = parse_description(child[0])
simple_response = SimpleResponse(
_type=parse_type(_type), description=description
)
continue
if child.tag == "exceptions":
exceptions = parse_exceptions(child)
continue
raise NotImplementedError(child.tag)
return (params, simple_response, exceptions)
def parse_operation(el: Element) -> Operation:
assert el.tag == "operation"
name = el.attrib.pop("name")
since = el.attrib.pop("since")
assert not el.attrib
assert not strip_string(el.text)
for child in el: # type: Element
if child.tag == "description":
assert not child.attrib
assert len(child) == 0
description = strip_string(child.text)
continue
if child.tag == "parameters":
params, simple_response, exceptions = parse_parameters(child)
continue
raise NotImplementedError(child.tag)
return Operation(
name=name,
since=since,
description=description,
params=params,
simple_response=simple_response,
exceptions=exceptions,
)
def parse_dataType(el: Element) -> DataType:
assert el.tag == "dataType"
name = el.attrib.pop("name")
assert not el.attrib
assert not strip_string(el.text)
description: Optional[str] = None
params: List[Param] = []
for child in el: # type: Element
if child.tag == "description":
assert description is None
description = parse_description(child)
continue
if child.tag == "parameter":
params.append(parse_parameter(child, parent=ParentType.DATA_TYPE))
continue
raise NotImplementedError(child.tag)
return DataType(name=name, description=description, params=params)
def parse_exceptionType(el: Element) -> ExceptionType:
assert el.tag == "exceptionType"
name = el.attrib.pop("name")
prefix = el.attrib.pop("prefix")
assert not el.attrib
assert not strip_string(el.text)
description: Optional[str] = None
params: List[Param] = []
for child in el: # type: Element
if child.tag == "description":
assert description is None
description = parse_description(child)
continue
if child.tag == "parameter":
params.append(
parse_parameter(child, parent=ParentType.EXCEPTION_TYPE)
)
continue
raise NotImplementedError(child.tag)
return ExceptionType(
name=name, description=description, prefix=prefix, params=params
)
def parse_simpleType(el: Element) -> SimpleType:
assert el.tag == "simpleType"
name = el.attrib.pop("name")
_type = el.attrib.pop("type")
assert not el.attrib
assert not strip_string(el.text)
values: Optional[List[Value]] = None
description: Optional[str] = None
for child in el: # type: Element
if child.tag == "description":
assert description is None
description = parse_description(child)
continue
if child.tag == "validValues":
assert values is None
values = parse_validValues(child, parent=ParentType.SIMPLE_TYPE)
continue
raise NotImplementedError(child.tag)
return SimpleType(
name=name, _type=_type, values=values, description=description
)
def parse_aping(el: Element) -> APING:
# We ignore the attributes here deliberately.
assert not strip_string(el.text)
description: Optional[str] = None
operations: List[Operation] = []
data_types: List[DataType] = []
exception_types: List[ExceptionType] = []
simple_types: List[SimpleType] = []
for child in el: # type: Element
if child.tag == "description":
assert description is None
description = parse_description(child)
continue
if child.tag == "operation":
operations.append(parse_operation(child))
continue
if child.tag == "dataType":
data_types.append(parse_dataType(child))
continue
if child.tag == "exceptionType":
exception_types.append(parse_exceptionType(child))
continue
if child.tag == "simpleType":
simple_types.append(parse_simpleType(child))
continue
raise NotImplementedError(child.tag)
return APING(
description=description,
operations=operations,
data_types=data_types,
exception_types=exception_types,
simple_types=simple_types,
)
def aping_name_to_rust_name(name: str) -> str:
# These are keywords in Rust. We need to figure out how
# to handle these later.
direct_conversions = {"type": "r#type", "id": "r#id", "async": "r#async"}
try:
return direct_conversions[name]
except KeyError:
pass
return name
def python_type_to_rust_type(_type: str, mandatory: bool = True) -> str:
# This is a bit hacky, particularly for the compound types,
# but the API is simple enough that this works anyway.
# We assume sets are actually vectors because otherwise the API
# is insane.
direct_conversions = {
"double": "f64",
"string": "String", # possibly a &str
"dateTime": "DateTime", # possibly an unaware DT
# These are pretty hacky, we should parse properly
"Set[string]": "Vec",
"Map[string, string]": "HashMap",
"Map[string, Matches]": "HashMap",
}
try:
_type = direct_conversions[_type]
except KeyError:
_type = _type.replace("List[", "Vec<")
_type = _type.replace("Map[", "HashMap<")
_type = _type.replace("Set[", "Vec<")
_type = _type.replace("]", ">")
if mandatory is False:
return f"Option<{_type}>"
return _type
def generate_rust_simple_types(simple_types: List[SimpleType]) -> List[str]:
"""
Return API bindings for the simpleTypes.
"""
types: List[str] = []
for simple_type in simple_types: # type: SimpleType
if simple_type.description is not None:
types.append(f"/// {simple_type.description}")
if simple_type.values is None:
rust_type: str = python_type_to_rust_type(simple_type._type)
types.append(f"pub type {simple_type.name} = {rust_type};")
continue
else:
# All of the enums are stringly typed, this is a sanity check
assert simple_type._type == "string"
variants: List[str] = []
for value in simple_type.values:
if value.description is not None:
variants.append(f"/// {value.description}")
variants.append(f"{value.name},")
variants_str = "\n".join(variants)
types.append(
f"""#[derive(Debug, Deserialize, Serialize)]
pub enum {simple_type.name} {{
{variants_str}
}}"""
)
continue
return types
def generate_rust_data_types(data_types: List[DataType]) -> List[str]:
"""
Return API bindings for the dataTypes.
"""
types: List[str] = []
for data_type in data_types: # type: DataType
# types.append(str(data_type))
# TODO: document the descriptions along with the param
params_converted: List[Tuple[str, str]] = []
for param in data_type.params: # type: Param
name: str = aping_name_to_rust_name(param.name)
_type: str = python_type_to_rust_type(param._type, param.mandatory)
params_converted.append((name, _type))
def format_param(x):
# TODO: this is super ugly. seriously?
if x[1].startswith("Option<"):
return f"""#[serde(skip_serializing_if = "Option::is_none")]
pub {x[0]}: {x[1]}"""
return f"pub {x[0]}: {x[1]}"
formatted_params: str = ", \n".join(
format_param(x) for x in params_converted
)
if data_type.description is not None:
types.append(f"/// {data_type.description}")
types.append(
f"""#[derive(Debug, Deserialize, Serialize)]
pub struct {data_type.name} {{ {formatted_params} }}"""
)
return types
@dataclass
class RustOperations:
"""Rust code derived from the BF operations"""
request_structs: List[str]
functions: List[str]
def generate_rust_operations(operations: List[Operation]) -> RustOperations:
"""
Return API bindings for the operations.
"""
request_structs: List[str] = []
functions: List[str] = []
for operation in operations: # type: Operation
# print(operation)
params_converted: List[Tuple[str, str]] = []
for param in operation.params: # type: Param
name: str = aping_name_to_rust_name(param.name)
_type: str = python_type_to_rust_type(param._type, param.mandatory)
params_converted.append((name, _type))
formatted_params_args: str = ", ".join(
["&self"] + [f"{x[0]}: {x[1]}" for x in params_converted]
)
resp_type: str = python_type_to_rust_type(
operation.simple_response._type
)
if len(operation.params) > 0:
struct_name: str = f"{operation.name}Request"
# TODO these should probably not be public, just for now
# so that we can test outside of jsonrpc
def format_param(x):
# TODO: this is super ugly. seriously?
if x[1].startswith("Option<"):
return f"""#[serde(skip_serializing_if = "Option::is_none")]
pub {x[0]}: {x[1]}"""
return f"pub {x[0]}: {x[1]}"
formatted_params_struct: str = ", \n".join(
format_param(x) for x in params_converted
)
request_structs.append(
f"""#[derive(Serialize)]
pub struct {struct_name} {{ {formatted_params_struct} }}"""
)
formatted_params_declare: str = ", ".join(
f"{x[0]}" for x in params_converted
)
function_interior = f"""
let req: {struct_name} = {struct_name} {{ {formatted_params_declare} }};
let rpc_request: RpcRequest<{struct_name}> = RpcRequest::new(
\"SportsAPING/v1.0/{operation.name}\".to_owned(),
req
);
self.req(rpc_request)
"""
else:
# TODO this smells, repetition
function_interior = f"""
let rpc_request: RpcRequest<()> = RpcRequest::new(
\"SportsAPING/v1.0/{operation.name}\".to_owned(),
()
);
self.req(rpc_request)
"""
function_signature = f"""fn {operation.name}({formatted_params_args}) ->
Result<{resp_type}>"""
if operation.description is not None:
functions.append(f"/// {operation.description}")
functions.append(
f"""#[allow(dead_code)]
pub {function_signature} {{ {function_interior} }}"""
)
return RustOperations(request_structs=request_structs, functions=functions)
def generate_rust_exceptions(
exception_types: List[ExceptionType]
) -> List[str]:
"""
Return API bindings for the exceptionTypes.
"""
# def format_param(param: Param):
# return f"""#[serde(rename = "{exception_type.prefix}-{(param._id):04d}")]
# {aping_name_to_rust_name(param.name)}"""
exceptions: List[str] = []
for exception_type in exception_types: # type: ExceptionType
# TODO
# if exception_type.description is not None:
# exceptions.append(f"/// {exception_type.description}")
for param in exception_type.params:
if param.name != "errorCode":
# TODO do not ignore
continue
if param.values is None:
rust_type: str = python_type_to_rust_type(param._type)
exceptions.append(f"pub type {param.name} = {rust_type};")
continue
# All of the enums are stringly typed, this is a sanity check
assert param._type == "string"
variants: List[str] = []
for value in param.values:
subvariant: List[str] = []
if value.description is not None:
subvariant.append(f"/// {value.description}")
subvariant.append(
f"""#[serde(rename = "{exception_type.prefix}-{(value._id):04d}")]
{value.name}"""
)
variants.append("\n".join(subvariant))
variants_str = ",\n".join(variants)
exceptions.append(
f"""#[derive(Debug, Deserialize)]
pub enum {param.name} {{
{variants_str}
}}"""
)
continue
return exceptions
def main() -> None:
tree = parse("SportsAPING.patched.xml")
aping: APING = parse_aping(tree.getroot())
# print(aping.to_json())
header = [
"//! # automatically generated",
"//! This module has been automatically generated by botfair",
"//! from the Betfair APING documentation at",
"//! https://docs.developer.betfair.com",
"//!",
"//! Any documentation here has been generated directly from the API",
"//! docs.",
"",
]
rust_operations: RustOperations = generate_rust_operations(
aping.operations
)
rust_simple_types: List[str] = generate_rust_simple_types(
aping.simple_types
)
rust_data_types: List[str] = generate_rust_data_types(aping.data_types)
rust_exceptions: List[str] = generate_rust_exceptions(
aping.exception_types
)
with open("../src/generated_types.rs", "w") as f:
for l in header:
f.write(l + "\n")
a = [
"#![allow(non_camel_case_types)]",
"#![allow(non_snake_case)]",
"use chrono::{DateTime, Utc};",
"use std::collections::HashMap;",
"use serde::{Deserialize, Serialize};",
]
for l in a:
f.write(l + "\n")
for l in rust_simple_types:
f.write(l + "\n")
for l in rust_data_types:
f.write(l + "\n")
with open("../src/generated_methods.rs", "w") as f:
for l in header:
f.write(l + "\n")
a = [
"#![allow(non_snake_case)]",
"use chrono::{DateTime, Utc};",
"use crate::result::Result;",
"use crate::json_rpc::RpcRequest;",
"use crate::generated_requests::*;",
"use crate::generated_types::*;",
]
for l in a:
f.write(l + "\n")
f.write("impl crate::client::BFClient {\n")
for l in rust_operations.functions:
f.write(l + "\n")
f.write("}\n")
with open("../src/generated_requests.rs", "w") as f:
for l in header:
f.write(l + "\n")
a = [
"#![allow(non_camel_case_types)]",
"#![allow(non_snake_case)]",
"use chrono::{DateTime, Utc};",
"use serde::Serialize;",
"use crate::generated_types::*;",
]
for l in a:
f.write(l + "\n")
for l in rust_operations.request_structs:
f.write(l + "\n")
with open("../src/generated_exceptions.rs", "w") as f:
for l in header:
f.write(l + "\n")
a = [
"#![allow(non_camel_case_types)]",
"#![allow(non_snake_case)]",
"use serde::Deserialize;",
]
for l in a:
f.write(l + "\n")
for l in rust_exceptions:
f.write(l + "\n")
if __name__ == "__main__":
main()