# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2021 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import platform import numpy as np import pytest import megengine as mge import megengine.distributed as dist from megengine.device import get_device_count from megengine.quantization import QuantMode, create_qparams from megengine.quantization.observer import ( ExponentialMovingAverageObserver, HistogramObserver, MinMaxObserver, Observer, PassiveObserver, SyncExponentialMovingAverageObserver, SyncMinMaxObserver, ) def test_observer(): with pytest.raises(TypeError): Observer("qint8") def test_min_max_observer(): x = np.random.rand(3, 3, 3, 3).astype("float32") np_min, np_max = x.min(), x.max() x = mge.tensor(x) m = MinMaxObserver() m(x) np.testing.assert_allclose(m.min_val.numpy(), np_min) np.testing.assert_allclose(m.max_val.numpy(), np_max) def test_exponential_moving_average_observer(): t = np.random.rand() x1 = np.random.rand(3, 3, 3, 3).astype("float32") x2 = np.random.rand(3, 3, 3, 3).astype("float32") expected_min = x1.min() * t + x2.min() * (1 - t) expected_max = x1.max() * t + x2.max() * (1 - t) m = ExponentialMovingAverageObserver(momentum=t) m(mge.tensor(x1, dtype=np.float32)) m(mge.tensor(x2, dtype=np.float32)) np.testing.assert_allclose(m.min_val.numpy(), expected_min, atol=1e-5) np.testing.assert_allclose(m.max_val.numpy(), expected_max, atol=1e-5) def test_histogram_observer(): x = np.random.rand(3, 3, 3, 3).astype("float32") np_min, np_max = x.min(), x.max() x = mge.tensor(x) m = HistogramObserver() m(x) np.testing.assert_allclose(m.min_val.numpy(), np_min) np.testing.assert_allclose(m.max_val.numpy(), np_max) def test_passive_observer(): qparams = create_qparams(QuantMode.SYMMERTIC, "qint8", mge.tensor(1.0)) m = PassiveObserver("qint8") m.set_qparams(qparams) assert m.orig_scale == 1.0 assert m.scale.numpy() == 1.0 assert m.get_qparams().dtype_meta == qparams.dtype_meta assert m.get_qparams().scale == qparams.scale assert m.get_qparams() == qparams @pytest.mark.require_ngpu(2) @pytest.mark.isolated_distributed def test_sync_min_max_observer(): word_size = get_device_count("gpu") x = np.random.rand(3 * word_size, 3, 3, 3).astype("float32") np_min, np_max = x.min(), x.max() @dist.launcher def worker(): rank = dist.get_rank() m = SyncMinMaxObserver() y = mge.tensor(x[rank * 3 : (rank + 1) * 3]) m(y) assert m.min_val == np_min and m.max_val == np_max worker() @pytest.mark.require_ngpu(2) @pytest.mark.isolated_distributed def test_sync_exponential_moving_average_observer(): word_size = get_device_count("gpu") t = np.random.rand() x1 = np.random.rand(3 * word_size, 3, 3, 3).astype("float32") x2 = np.random.rand(3 * word_size, 3, 3, 3).astype("float32") expected_min = x1.min() * t + x2.min() * (1 - t) expected_max = x1.max() * t + x2.max() * (1 - t) @dist.launcher def worker(): rank = dist.get_rank() m = SyncExponentialMovingAverageObserver(momentum=t) y1 = mge.tensor(x1[rank * 3 : (rank + 1) * 3]) y2 = mge.tensor(x2[rank * 3 : (rank + 1) * 3]) m(y1) m(y2) np.testing.assert_allclose(m.min_val.numpy(), expected_min, atol=1e-6) np.testing.assert_allclose(m.max_val.numpy(), expected_max, atol=1e-6) worker()