# -*- coding: utf-8 -*- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2021 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import os import re import subprocess import sys import numpy as np import pytest import megengine as mge import megengine.autodiff as ad import megengine.functional as F from megengine import jit from megengine.core._trace_option import set_symbolic_shape from megengine.core.ops import builtin from megengine.core.tensor.utils import make_shape_tuple from megengine.functional.debug_param import set_execution_strategy from megengine.jit import SublinearMemoryConfig from megengine.module import ( AdaptiveAvgPool2d, AvgPool2d, BatchNorm2d, Conv2d, Linear, Module, ) from megengine.optimizer import SGD from megengine.tensor import Tensor Strategy = builtin.ops.Convolution.Strategy def get_gpu_name(): try: gpu_info = subprocess.check_output( ["nvidia-smi", "--query-gpu=gpu_name", "--format=csv,noheader"] ) gpu_info = gpu_info.decode("ascii").split("\n")[0] except: gpu_info = "None" return gpu_info def get_cpu_name(): cpu_info = "None" try: cpu_info = subprocess.check_output(["cat", "/proc/cpuinfo"]).decode("ascii") for line in cpu_info.split("\n"): if "model name" in line: return re.sub(".*model name.*:", "", line, 1).strip() except: pass return cpu_info def get_xpu_name(): if mge.is_cuda_available(): return get_gpu_name() else: return get_cpu_name() class MnistNet(Module): def __init__(self, has_bn=False, use_adaptive_pooling=False): super().__init__() self.conv0 = Conv2d(1, 20, kernel_size=5, bias=True) if use_adaptive_pooling: self.pool0 = AdaptiveAvgPool2d(12) else: self.pool0 = AvgPool2d(2) self.conv1 = Conv2d(20, 20, kernel_size=5, bias=True) self.pool1 = AvgPool2d(2) self.fc0 = Linear(20 * 4 * 4, 500, bias=True) self.fc1 = Linear(500, 10, bias=True) self.bn0 = None self.bn1 = None if has_bn: self.bn0 = BatchNorm2d(20) self.bn1 = BatchNorm2d(20) def forward(self, x): x = self.conv0(x) if self.bn0: x = self.bn0(x) x = F.relu(x) x = self.pool0(x) x = self.conv1(x) if self.bn1: x = self.bn1(x) x = F.relu(x) x = self.pool1(x) x = F.flatten(x, 1) x = self.fc0(x) x = F.relu(x) x = self.fc1(x) return x def train(data, label, net, opt, gm): with gm: pred = net(data) loss = F.nn.cross_entropy(pred, label) gm.backward(loss) return loss def update_model(model_path): """ Update the dumped model with test cases for new reference values. The model with pre-trained weights is trained for one iter with the test data attached. The loss and updated net state dict is dumped. .. code-block:: python from test_correctness import update_model update_model('mnist_model_with_test.mge') # for gpu update_model('mnist_model_with_test_cpu.mge') # for cpu """ net = MnistNet(has_bn=True) checkpoint = mge.load(model_path) net.load_state_dict(checkpoint["net_init"]) lr = checkpoint["sgd_lr"] opt = SGD(net.parameters(), lr=lr) gm = ad.GradManager().attach(net.parameters()) data = Tensor(checkpoint["data"], dtype=np.float32) label = Tensor(checkpoint["label"], dtype=np.int32) opt.clear_grad() loss = train(data, label, net, opt, gm) opt.step() xpu_name = get_xpu_name() checkpoint.update( {"net_updated": net.state_dict(), "loss": loss.numpy(), "xpu": xpu_name} ) mge.save(checkpoint, model_path) def run_train( model_path, use_jit, use_symbolic, sublinear_memory_config=None, max_err=None, use_adaptive_pooling=False, ): """ Load the model with test cases and run the training for one iter. The loss and updated weights are compared with reference value to verify the correctness. Dump a new file with updated result by calling update_model if you think the test fails due to numerical rounding errors instead of bugs. Please think twice before you do so. """ net = MnistNet(has_bn=True, use_adaptive_pooling=use_adaptive_pooling) checkpoint = mge.load(model_path) net.load_state_dict(checkpoint["net_init"]) lr = checkpoint["sgd_lr"] opt = SGD(net.parameters(), lr=lr) gm = ad.GradManager().attach(net.parameters()) data = Tensor(checkpoint["data"], dtype=np.float32) label = Tensor(checkpoint["label"], dtype=np.int32) if max_err is None: max_err = 1e-5 train_func = train if use_jit: train_func = jit.trace( train_func, symbolic=use_symbolic, sublinear_memory_config=sublinear_memory_config, ) opt.clear_grad() loss = train_func(data, label, net, opt, gm) opt.step() np.testing.assert_allclose(loss.numpy(), checkpoint["loss"], atol=max_err) for param, param_ref in zip( net.state_dict().items(), checkpoint["net_updated"].items() ): assert param[0] == param_ref[0] if "bn" in param[0]: ref = param_ref[1].reshape(param[1].shape) np.testing.assert_allclose(param[1], ref, atol=max_err) else: np.testing.assert_allclose(param[1], param_ref[1], atol=max_err) def run_eval( model_path, use_symbolic, sublinear_memory_config=None, max_err=None, use_adaptive_pooling=False, ): """ Load the model with test cases and run the training for one iter. The loss and updated weights are compared with reference value to verify the correctness. Dump a new file with updated result by calling update_model if you think the test fails due to numerical rounding errors instead of bugs. Please think twice before you do so. """ net = MnistNet(has_bn=True, use_adaptive_pooling=use_adaptive_pooling) checkpoint = mge.load(model_path) net.load_state_dict(checkpoint["net_init"]) data = Tensor(checkpoint["data"], dtype=np.float32) def eval_fun(data, *, net=None): pred = net(data) return pred refer_value = eval_fun(data, net=net) eval_fun = jit.trace(eval_fun, symbolic=use_symbolic) for _ in range(3): new_value = eval_fun(data, net=net) np.testing.assert_allclose(new_value.numpy(), refer_value.numpy(), atol=max_err) @pytest.mark.skip(reason="close it when cu111 ci") def test_correctness(): if mge.is_cuda_available(): model_name = "mnist_model_with_test.mge" else: model_name = "mnist_model_with_test_cpu.mge" model_path = os.path.join(os.path.dirname(__file__), model_name) set_execution_strategy(Strategy.HEURISTIC | Strategy.REPRODUCIBLE) run_train(model_path, False, False, max_err=1e-5) run_train(model_path, True, False, max_err=1e-5) run_train(model_path, True, True, max_err=1e-5) # sublinear config = SublinearMemoryConfig(genetic_nr_iter=10) run_train( model_path, True, True, sublinear_memory_config=config, max_err=1e-5, ) run_eval(model_path, False, max_err=1e-7) run_eval(model_path, True, max_err=1e-7) @pytest.mark.skip(reason="close it when cu111 ci") def test_correctness_use_adaptive_pooling(): if mge.is_cuda_available(): model_name = "mnist_model_with_test.mge" else: model_name = "mnist_model_with_test_cpu.mge" model_path = os.path.join(os.path.dirname(__file__), model_name) set_execution_strategy("HEURISTIC_REPRODUCIBLE") run_train(model_path, False, False, max_err=1e-5, use_adaptive_pooling=True) run_train(model_path, True, False, max_err=1e-5, use_adaptive_pooling=True) run_train(model_path, True, True, max_err=1e-5, use_adaptive_pooling=True) # sublinear config = SublinearMemoryConfig(genetic_nr_iter=10) run_train( model_path, True, True, sublinear_memory_config=config, max_err=1e-5, use_adaptive_pooling=True, ) run_eval(model_path, False, max_err=1e-7, use_adaptive_pooling=True) run_eval(model_path, True, max_err=1e-7, use_adaptive_pooling=True)