# -*- coding: utf-8 -*- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2021 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import functools import os import numpy as np from megenginelite import * def require_cuda(func): """a decorator that disables a testcase if cuda is not enabled""" @functools.wraps(func) def wrapped(*args, **kwargs): if LiteGlobal.get_device_count(LiteDeviceType.LITE_CUDA): return func(*args, **kwargs) return wrapped @require_cuda def test_tensor_collect_batch(): batch_tensor = TensorBatchCollector( [4, 8, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CUDA ) arr = np.ones([8, 8], "int32") for i in range(4): batch_tensor.collect(arr) arr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 8 assert data.shape[2] == 8 for i in range(4): for j in range(64): assert data[i][j // 8][j % 8] == i + 1 def test_tensor_collect_batch_cpu(): batch_tensor = TensorBatchCollector( [4, 8, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CPU ) arr = np.ones([8, 8], "int32") for i in range(4): batch_tensor.collect(arr) arr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 8 assert data.shape[2] == 8 for i in range(4): for j in range(64): assert data[i][j // 8][j % 8] == i + 1 @require_cuda def test_tensor_collect_batch_by_index(): batch_tensor = TensorBatchCollector( [4, 8, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CUDA ) arr = np.ones([8, 8], "int32") arr += 1 # ==2 batch_tensor.collect_id(arr, 1) arr -= 1 # ==1 batch_tensor.collect_id(arr, 0) arr += 2 # ==3 batch_tensor.collect_id(arr, 2) arr += 1 # ==4 batch_tensor.collect_id(arr, 3) data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 8 assert data.shape[2] == 8 for i in range(4): for j in range(64): assert data[i][j // 8][j % 8] == i + 1 @require_cuda def test_tensor_collect_batch_tensor(): batch_tensor = TensorBatchCollector( [4, 6, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CUDA ) nparr = np.ones([6, 8], "int32") tensor = LiteTensor(LiteLayout([6, 8], LiteDataType.LITE_INT)) for i in range(4): tensor.set_data_by_share(nparr) batch_tensor.collect(tensor) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 def test_tensor_collect_batch_tensor_cpu(): batch_tensor = TensorBatchCollector( [4, 6, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CPU ) nparr = np.ones([6, 8], "int32") tensor = LiteTensor(LiteLayout([6, 8], LiteDataType.LITE_INT)) for i in range(4): tensor.set_data_by_share(nparr) batch_tensor.collect(tensor) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 @require_cuda def test_tensor_collect_batch_ctypes(): batch_tensor = TensorBatchCollector( [4, 6, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CUDA ) nparr = np.ones([6, 8], "int32") for i in range(4): in_data = nparr.ctypes.data batch_tensor.collect_by_ctypes(in_data, nparr.nbytes) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 def test_tensor_collect_batch_ctypes_cpu(): batch_tensor = TensorBatchCollector( [4, 6, 8], dtype=LiteDataType.LITE_INT, device_type=LiteDeviceType.LITE_CPU ) nparr = np.ones([6, 8], "int32") for i in range(4): in_data = nparr.ctypes.data batch_tensor.collect_by_ctypes(in_data, nparr.nbytes) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 @require_cuda def test_tensor_collect_batch_device_tensor(): all_tensor = LiteTensor( LiteLayout([4, 6, 8], dtype=LiteDataType.LITE_INT), device_type=LiteDeviceType.LITE_CUDA, ) batch_tensor = TensorBatchCollector([4, 6, 8], tensor=all_tensor) nparr = np.ones([6, 8], "int32") tensor = LiteTensor(LiteLayout([6, 8], LiteDataType.LITE_INT)) for i in range(4): tensor.set_data_by_share(nparr) batch_tensor.collect(tensor) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 @require_cuda def test_tensor_collect_batch_device_numpy(): all_tensor = LiteTensor( LiteLayout([4, 6, 8], dtype=LiteDataType.LITE_INT), device_type=LiteDeviceType.LITE_CUDA, ) batch_tensor = TensorBatchCollector([4, 6, 8], tensor=all_tensor) nparr = np.ones([6, 8], "int32") for i in range(4): batch_tensor.collect(nparr) nparr += 1 data = batch_tensor.to_numpy() assert data.shape[0] == 4 assert data.shape[1] == 6 assert data.shape[2] == 8 for i in range(4): for j in range(48): assert data[i][j // 8][j % 8] == i + 1 def test_get_model_io_ahead(): source_dir = os.getenv("LITE_TEST_RESOURCE") model_path = os.path.join(source_dir, "shufflenet.mge") ios = get_model_io_info(model_path) assert len(ios.inputs) == 1 assert ios.inputs[0].name == "data" assert ios.inputs[0].config_layout.shapes[1] == 3 assert ios.inputs[0].config_layout.shapes[2] == 224 assert ios.inputs[0].config_layout.shapes[3] == 224 assert len(ios.outputs) == 1 assert ios.outputs[0].name == "TRUE_DIV(EXP[12065],reduce0[12067])[12077]" assert ios.outputs[0].config_layout.shapes[0] == 1 assert ios.outputs[0].config_layout.shapes[1] == 1000