# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import import sys from uuid import uuid4 from proton import * from proton._compat import raise_ from . import common class Test(common.Test): def setUp(self): self.data = Data() def tearDown(self): self.data = None class DataTest(Test): def testTopLevelNext(self): assert self.data.next() is None self.data.put_null() self.data.put_bool(False) self.data.put_int(0) assert self.data.next() is None self.data.rewind() assert self.data.next() == Data.NULL assert self.data.next() == Data.BOOL assert self.data.next() == Data.INT assert self.data.next() is None def testNestedNext(self): assert self.data.next() is None self.data.put_null() assert self.data.next() is None self.data.put_list() assert self.data.next() is None self.data.put_bool(False) assert self.data.next() is None self.data.rewind() assert self.data.next() is Data.NULL assert self.data.next() is Data.LIST self.data.enter() assert self.data.next() is None self.data.put_ubyte(0) assert self.data.next() is None self.data.put_uint(0) assert self.data.next() is None self.data.put_int(0) assert self.data.next() is None self.data.exit() assert self.data.next() is Data.BOOL assert self.data.next() is None self.data.rewind() assert self.data.next() is Data.NULL assert self.data.next() is Data.LIST assert self.data.enter() assert self.data.next() is Data.UBYTE assert self.data.next() is Data.UINT assert self.data.next() is Data.INT assert self.data.next() is None assert self.data.exit() assert self.data.next() is Data.BOOL assert self.data.next() is None def testEnterExit(self): assert self.data.next() is None assert not self.data.enter() self.data.put_list() assert self.data.enter() assert self.data.next() is None self.data.put_list() assert self.data.enter() self.data.put_list() assert self.data.enter() assert self.data.exit() assert self.data.get_list() == 0 assert self.data.exit() assert self.data.get_list() == 1 assert self.data.exit() assert self.data.get_list() == 1 assert not self.data.exit() assert self.data.get_list() == 1 assert self.data.next() is None self.data.rewind() assert self.data.next() is Data.LIST assert self.data.get_list() == 1 assert self.data.enter() assert self.data.next() is Data.LIST assert self.data.get_list() == 1 assert self.data.enter() assert self.data.next() is Data.LIST assert self.data.get_list() == 0 assert self.data.enter() assert self.data.next() is None assert self.data.exit() assert self.data.get_list() == 0 assert self.data.exit() assert self.data.get_list() == 1 assert self.data.exit() assert self.data.get_list() == 1 assert not self.data.exit() def put(self, putter, v): """More informative exception from putters, include bad value""" try: putter(v) except Exception: etype, value, trace = sys.exc_info() raise_(etype, etype("%s(%r): %s" % (putter.__name__, v, value)), trace) return putter # (bits, signed) for each integer type INT_TYPES = { "byte": (8, True), "ubyte": (8, False), "short": (16, True), "ushort": (16, False), "int": (32, True), "uint": (32, False), "long": (64, True), "ulong": (64, False) } def int_values(self, dtype): """Set of test values for integer type dtype, include extreme and medial values""" bits, signed = self.INT_TYPES[dtype] values = [0, 1, 2, 5, 42] if signed: min, max = -2**(bits - 1), 2**(bits - 1) - 1 values.append(max // 2) values += [-i for i in values if i] values += [min, max] else: max = 2**(bits) - 1 values += [max // 2, max] return sorted(values) def _testArray(self, dtype, descriptor, atype, *values): if dtype: dTYPE = getattr(self.data, dtype.upper()) aTYPE = getattr(self.data, atype.upper()) self.data.put_array(dtype is not None, aTYPE) self.data.enter() if dtype is not None: putter = getattr(self.data, "put_%s" % dtype) self.put(putter, descriptor) putter = getattr(self.data, "put_%s" % atype) for v in values: self.put(putter, v) self.data.exit() self.data.rewind() assert self.data.next() == Data.ARRAY count, described, type = self.data.get_array() assert count == len(values), count if dtype is None: assert described == False else: assert described assert type == aTYPE, type assert self.data.enter() if described: assert self.data.next() == dTYPE getter = getattr(self.data, "get_%s" % dtype) gotten = getter() assert gotten == descriptor, gotten if values: getter = getattr(self.data, "get_%s" % atype) for v in values: assert self.data.next() == aTYPE gotten = getter() assert gotten == v, gotten assert self.data.next() is None assert self.data.exit() def testStringArray(self): self._testArray(None, None, "string", "one", "two", "three") def testDescribedStringArray(self): self._testArray("symbol", "url", "string", "one", "two", "three") def _test_int_array(self, atype): self._testArray(None, None, atype, *self.int_values(atype)) def testByteArray(self): self._test_int_array("byte") def testUbyteArray(self): self._test_int_array("ubyte") def testShortArray(self): self._test_int_array("short") def testUshortArray(self): self._test_int_array("ushort") def testIntArray(self): self._test_int_array("int") def testUintArray(self): self._test_int_array("uint") def testLongArray(self): self._test_int_array("long") def testUlongArray(self): self._test_int_array("ulong") def testUUIDArray(self): self._testArray(None, None, "uuid", uuid4(), uuid4(), uuid4()) def testEmptyArray(self): self._testArray(None, None, "null") def testDescribedEmptyArray(self): self._testArray("long", 0, "null") def testPropertyDict(self): a = PropertyDict(one=1, two=2, three=3) b = PropertyDict({'one': 1, 'two': 2, 'three': 3}) c = PropertyDict(zip(['one', 'two', 'three'], [1, 2, 3])) d = PropertyDict([('two', 2), ('one', 1), ('three', 3)]) e = PropertyDict({symbol('three'): 3, symbol('one'): 1, symbol('two'): 2}) f = PropertyDict(a) g = PropertyDict() g['one'] = 1 g[symbol('two')] = 2 g['three'] = 3 assert a == b == c == d == e == f == g for k in a.keys(): assert isinstance(k, symbol) self.assertRaises(KeyError, AnnotationDict, {'one': 1, None: 'none'}) self.assertRaises(KeyError, AnnotationDict, {'one': 1, 1.23: 4}) def testPropertyDictNoRaiseError(self): a = PropertyDict(one=1, two=2, three=3, raise_on_error=False) a[4] = 'four' b = PropertyDict({'one': 1, 'two': 2, 'three': 3, 4: 'four'}, raise_on_error=False) c = PropertyDict(zip(['one', 'two', 'three', 4], [1, 2, 3, 'four']), raise_on_error=False) d = PropertyDict([('two', 2), ('one', 1), ('three', 3), (4, 'four')], raise_on_error=False) e = PropertyDict({4: 'four', symbol('three'): 3, symbol('one'): 1, symbol('two'): 2}, raise_on_error=False) f = PropertyDict(a, raise_on_error=False) g = PropertyDict(raise_on_error=False) g['one'] = 1 g[4] = 'four' g[symbol('two')] = 2 g['three'] = 3 assert a == b == c == d == e == f == g def testAnnotationDict(self): # AnnotationMap c'tor calls update(), so this method is also covered a = AnnotationDict(one=1, two=2, three=3) a[ulong(4)] = 'four' b = AnnotationDict({'one': 1, 'two': 2, 'three': 3, ulong(4): 'four'}) c = AnnotationDict(zip(['one', 'two', 'three', ulong(4)], [1, 2, 3, 'four'])) d = AnnotationDict([('two', 2), ('one', 1), ('three', 3), (ulong(4), 'four')]) e = AnnotationDict({symbol('three'): 3, ulong(4): 'four', symbol('one'): 1, symbol('two'): 2}) f = AnnotationDict(a) g = AnnotationDict() g[ulong(4)] = 'four' g['one'] = 1 g[symbol('two')] = 2 g['three'] = 3 assert a == b == c == d == e == f == g for k in a.keys(): assert isinstance(k, (symbol, ulong)) self.assertRaises(KeyError, AnnotationDict, {'one': 1, None: 'none'}) self.assertRaises(KeyError, AnnotationDict, {'one': 1, 1.23: 4}) def testAnnotationDictNoRaiseError(self): a = AnnotationDict(one=1, two=2, three=3, raise_on_error=False) a[ulong(4)] = 'four' a[5] = 'five' b = AnnotationDict({'one': 1, 'two': 2, 'three': 3, ulong(4): 'four', 5: 'five'}, raise_on_error=False) c = AnnotationDict(zip(['one', 'two', 'three', ulong(4), 5], [1, 2, 3, 'four', 'five']), raise_on_error=False) d = AnnotationDict([('two', 2), ('one', 1), ('three', 3), (ulong(4), 'four'), (5, 'five')], raise_on_error=False) e = AnnotationDict({5: 'five', symbol('three'): 3, ulong(4): 'four', symbol('one'): 1, symbol('two'): 2}, raise_on_error=False) f = AnnotationDict(a, raise_on_error=False) g = AnnotationDict(raise_on_error=False) g[ulong(4)] = 'four' g['one'] = 1 g[symbol('two')] = 2 g[5] = 'five' g['three'] = 3 assert a == b == c == d == e == f == g def testSymbolList(self): a = SymbolList(['one', 'two', 'three']) b = SymbolList([symbol('one'), symbol('two'), symbol('three')]) c = SymbolList() c.append('one') c.extend([symbol('two'), 'three']) d1 = SymbolList(['one']) d2 = SymbolList(['two', symbol('three')]) d = d1 + d2 e = SymbolList(['one']) e += SymbolList(['two', symbol('three')]) f = SymbolList(['one', 'hello', 'goodbye']) f[1] = symbol('two') f[2] = 'three' g = SymbolList(a) assert a == b == c == d == e == f == g for v in a: assert isinstance(v, symbol) self.assertRaises(TypeError, SymbolList, ['one', None]) self.assertRaises(TypeError, SymbolList, ['one', 2]) self.assertRaises(TypeError, SymbolList, ['one', ['two']]) self.assertRaises(TypeError, SymbolList, ['one', {'two': 3}]) def testSymbolListNoRaiseError(self): a = SymbolList(['one', 'two', 'three', 4], raise_on_error=False) b = SymbolList([symbol('one'), symbol('two'), symbol('three'), 4], raise_on_error=False) c = SymbolList(raise_on_error=False) c.append('one') c.extend([symbol('two'), 'three', 4]) d1 = SymbolList(['one'], raise_on_error=False) d2 = SymbolList(['two', symbol('three'), 4], raise_on_error=False) d = d1 + d2 e = SymbolList(['one'], raise_on_error=False) e += SymbolList(['two', symbol('three'), 4], raise_on_error=False) f = SymbolList(['one', 'hello', 'goodbye', 'what?'], raise_on_error=False) f[1] = symbol('two') f[2] = 'three' f[3] = 4 g = SymbolList(a, raise_on_error=False) assert a == b == c == d == e == f == g def _test(self, dtype, *values, **kwargs): eq = kwargs.get("eq", lambda x, y: x == y) ntype = getattr(Data, dtype.upper()) putter = getattr(self.data, "put_%s" % dtype) getter = getattr(self.data, "get_%s" % dtype) for v in values: self.put(putter, v) gotten = getter() assert eq(gotten, v), (gotten, v) self.data.rewind() for v in values: vtype = self.data.next() assert vtype == ntype, vtype gotten = getter() assert eq(gotten, v), (gotten, v) encoded = self.data.encode() copy = Data(0) while encoded: n = copy.decode(encoded) encoded = encoded[n:] copy.rewind() cgetter = getattr(copy, "get_%s" % dtype) for v in values: vtype = copy.next() assert vtype == ntype, vtype gotten = cgetter() assert eq(gotten, v), (gotten, v) def _test_int(self, itype): self._test(itype, *self.int_values(itype)) def testByte(self): self._test_int("byte") def testUbyte(self): self._test_int("ubyte") self.assertRaises(AssertionError, ubyte, -1) def testShort(self): self._test_int("short") def testUshort(self): self._test("ushort") self.assertRaises(AssertionError, ushort, -1) def testInt(self): self._test_int("int") def testUint(self): self._test_int("uint") self.assertRaises(AssertionError, uint, -1) def testLong(self): self._test_int("long") def testUlong(self): self._test_int("ulong") self.assertRaises(AssertionError, ulong, -1) def testString(self): self._test("string", "one", "two", "three", "this is a test", "") def testFloat(self): # we have to use a special comparison here because python # internally only uses doubles and converting between floats and # doubles is imprecise self._test("float", 0, 1, 2, 3, 0.1, 0.2, 0.3, -1, -2, -3, -0.1, -0.2, -0.3, eq=lambda x, y: x - y < 0.000001) def testDouble(self): self._test("double", 0, 1, 2, 3, 0.1, 0.2, 0.3, -1, -2, -3, -0.1, -0.2, -0.3) def testBinary(self): self._test("binary", b"this", b"is", b"a", b"test", b"of" b"b\x00inary") def testSymbol(self): self._test("symbol", symbol("this is a symbol test"), symbol("bleh"), symbol("blah")) def testTimestamp(self): self._test("timestamp", timestamp(0), timestamp(12345), timestamp(1000000)) def testChar(self): self._test("char", char('a'), char('b'), char('c'), char(u'\u20AC')) def testUUID(self): self._test("uuid", uuid4(), uuid4(), uuid4()) def testDecimal32(self): self._test("decimal32", decimal32(0), decimal32(1), decimal32(2), decimal32(3), decimal32(4), decimal32(2**30)) def testDecimal64(self): self._test("decimal64", decimal64(0), decimal64(1), decimal64(2), decimal64(3), decimal64(4), decimal64(2**60)) def testDecimal128(self): self._test("decimal128", decimal128(b"fdsaasdf;lkjjkl;"), decimal128(b"x" * 16)) def testCopy(self): self.data.put_described() self.data.enter() self.data.put_ulong(123) self.data.put_map() self.data.enter() self.data.put_string("pi") self.data.put_double(3.14159265359) dst = Data() dst.copy(self.data) copy = dst.format() orig = self.data.format() assert copy == orig, (copy, orig) def testCopyNested(self): nested = [1, 2, 3, [4, 5, 6], 7, 8, 9] self.data.put_object(nested) dst = Data() dst.copy(self.data) assert dst.format() == self.data.format() def testCopyNestedArray(self): nested = [Array(UNDESCRIBED, Data.LIST, ["first", [Array(UNDESCRIBED, Data.INT, 1, 2, 3)]], ["second", [Array(UNDESCRIBED, Data.INT, 1, 2, 3)]], ["third", [Array(UNDESCRIBED, Data.INT, 1, 2, 3)]], ), "end"] self.data.put_object(nested) dst = Data() dst.copy(self.data) assert dst.format() == self.data.format() def testRoundTrip(self): obj = {symbol("key"): timestamp(1234), ulong(123): "blah", char("c"): "bleh", u"desc": Described(symbol("url"), u"http://example.org"), u"array": Array(UNDESCRIBED, Data.INT, 1, 2, 3), u"list": [1, 2, 3, None, 4], u"boolean": True} self.data.put_object(obj) enc = self.data.encode() data = Data() data.decode(enc) data.rewind() assert data.next() copy = data.get_object() assert copy == obj, (copy, obj) def testBuffer(self): try: self.data.put_object(buffer(b"foo")) except NameError: # python >= 3.0 does not have `buffer` return data = Data() data.decode(self.data.encode()) data.rewind() assert data.next() assert data.type() == Data.BINARY assert data.get_object() == b"foo" def testMemoryView(self): self.data.put_object(memoryview(b"foo")) data = Data() data.decode(self.data.encode()) data.rewind() assert data.next() assert data.type() == Data.BINARY assert data.get_object() == b"foo" def testLookup(self): obj = {symbol("key"): u"value", symbol("pi"): 3.14159, symbol("list"): [1, 2, 3, 4]} self.data.put_object(obj) self.data.rewind() self.data.next() self.data.enter() self.data.narrow() assert self.data.lookup("pi") assert self.data.get_object() == 3.14159 self.data.rewind() assert self.data.lookup("key") assert self.data.get_object() == u"value" self.data.rewind() assert self.data.lookup("list") assert self.data.get_object() == [1, 2, 3, 4] self.data.widen() self.data.rewind() assert not self.data.lookup("pi")