From 2ced7a72a1c378f9192c72e89a5804c52b1b38e5 Mon Sep 17 00:00:00 2001 From: Alan MacDonald Date: Wed, 1 Feb 2023 09:12:15 -0800 Subject: [PATCH] [microTVM] Clean-up test_crt.py and add to pylint (#13886) normalize test_crt.py temporary directory usage remove test_crt.py unused code and variables fix test_crt.py to pass pylint and add to pylint script --- tests/lint/pylint.sh | 1 + tests/python/unittest/test_crt.py | 167 +++++++++++++----------------- 2 files changed, 72 insertions(+), 96 deletions(-) diff --git a/tests/lint/pylint.sh b/tests/lint/pylint.sh index d716ddea7ba1..ac93a6f15df6 100755 --- a/tests/lint/pylint.sh +++ b/tests/lint/pylint.sh @@ -19,6 +19,7 @@ set -euxo pipefail python3 -m pylint python/tvm --rcfile="$(dirname "$0")"/pylintrc python3 -m pylint vta/python/vta --rcfile="$(dirname "$0")"/pylintrc +python3 -m pylint tests/python/unittest/test_crt.py --rcfile="$(dirname "$0")"/pylintrc python3 -m pylint tests/python/unittest/test_tvmscript_type.py --rcfile="$(dirname "$0")"/pylintrc python3 -m pylint tests/python/contrib/test_cmsisnn --rcfile="$(dirname "$0")"/pylintrc python3 -m pylint tests/python/contrib/test_ethosn --rcfile="$(dirname "$0")"/pylintrc diff --git a/tests/python/unittest/test_crt.py b/tests/python/unittest/test_crt.py index 3309aad0a5db..198a5816321e 100644 --- a/tests/python/unittest/test_crt.py +++ b/tests/python/unittest/test_crt.py @@ -15,15 +15,12 @@ # specific language governing permissions and limitations # under the License. -import numpy as np -import os +"""Test C runtime""" + import pathlib -import shutil import pytest -pytest.importorskip("pty") - -import pytest +import numpy as np import tvm import tvm.relay @@ -32,6 +29,8 @@ from tvm.relay.backend import Runtime from tvm.relay.backend import Executor +pytest.importorskip("pty") + BUILD = True DEBUG = False @@ -57,38 +56,30 @@ def _make_session(temp_dir, mod): def _make_add_sess(temp_dir): - A = tvm.te.placeholder((2,), dtype="int8") - B = tvm.te.placeholder((1,), dtype="int8") - C = tvm.te.compute(A.shape, lambda i: A[i] + B[0], name="C") - sched = tvm.te.create_schedule(C.op) - return _make_sess_from_op(temp_dir, "add", sched, [A, B, C]) - - -def _make_ident_sess(temp_dir): - A = tvm.te.placeholder((2,), dtype="int8") - B = tvm.te.compute(A.shape, lambda i: A[i], name="B") - sched = tvm.te.create_schedule(B.op) - return _make_sess_from_op(temp_dir, "ident", sched, [A, B]) + a = tvm.te.placeholder((2,), dtype="int8") + b = tvm.te.placeholder((1,), dtype="int8") + c = tvm.te.compute(a.shape, lambda i: a[i] + b[0], name="c") + sched = tvm.te.create_schedule(c.op) + return _make_sess_from_op(temp_dir, "add", sched, [a, b, c]) @tvm.testing.requires_micro def test_compile_runtime(): """Test compiling the on-device runtime.""" - import tvm.micro temp_dir = tvm.contrib.utils.tempdir() with _make_add_sess(temp_dir) as sess: - A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device) - assert (A_data.numpy() == np.array([2, 3])).all() - B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device) - assert (B_data.numpy() == np.array([4])).all() - C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device) - assert (C_data.numpy() == np.array([0, 0])).all() + a_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device) + assert (a_data.numpy() == np.array([2, 3])).all() + b_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device) + assert (b_data.numpy() == np.array([4])).all() + c_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device) + assert (c_data.numpy() == np.array([0, 0])).all() system_lib = sess.get_system_lib() - system_lib.get_function("add")(A_data, B_data, C_data) - assert (C_data.numpy() == np.array([6, 7])).all() + system_lib.get_function("add")(a_data, b_data, c_data) + assert (c_data.numpy() == np.array([6, 7])).all() @tvm.testing.requires_micro @@ -111,8 +102,6 @@ def test_compile_runtime_llvm(): @tvm.testing.requires_micro def test_reset(): """Test when the remote end resets during a session.""" - import tvm.micro - from tvm.micro import transport temp_dir = tvm.contrib.utils.tempdir() @@ -128,10 +117,7 @@ def test_reset(): def test_graph_executor(): """Test use of the graph executor with microTVM.""" - ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace") - if ws_root.exists(): - shutil.rmtree(ws_root) - temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve()) + temp_dir = tvm.contrib.utils.tempdir() relay_mod = tvm.parser.fromtext( """ #[version = "0.0.5"] @@ -147,15 +133,15 @@ def @main(%a : Tensor[(1, 2), uint8], %b : Tensor[(1, 2), uint8]) { def do_test(graph_mod): - A_data = tvm.nd.array(np.array([2, 3], dtype="uint8"), device=sess.device) - assert (A_data.numpy() == np.array([2, 3])).all() - B_data = tvm.nd.array(np.array([4, 7], dtype="uint8"), device=sess.device) - assert (B_data.numpy() == np.array([4, 7])).all() + a_data = tvm.nd.array(np.array([2, 3], dtype="uint8"), device=sess.device) + assert (a_data.numpy() == np.array([2, 3])).all() + b_data = tvm.nd.array(np.array([4, 7], dtype="uint8"), device=sess.device) + assert (b_data.numpy() == np.array([4, 7])).all() assert graph_mod.get_input_index("a") == 0 assert graph_mod.get_input_index("b") == 1 - graph_mod.run(a=A_data, b=B_data) + graph_mod.run(a=a_data, b=b_data) out = graph_mod.get_output(0) assert (out.numpy() == np.array([6, 10])).all() @@ -179,10 +165,7 @@ def do_test(graph_mod): def test_aot_executor(): """Test use of the AOT executor with microTVM.""" - ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace") - if ws_root.exists(): - shutil.rmtree(ws_root) - temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve()) + temp_dir = tvm.contrib.utils.tempdir() relay_mod = tvm.parser.fromtext( """ #[version = "0.0.5"] @@ -210,20 +193,20 @@ def do_test(): assert aot_executor.get_num_inputs() == 2 assert aot_executor.get_num_outputs() == 1 - A_np = np.array([[2, 3]], dtype="uint8") - B_np = np.array([[4, 7]], dtype="uint8") + a_np = np.array([[2, 3]], dtype="uint8") + b_np = np.array([[4, 7]], dtype="uint8") - A_data = aot_executor.get_input("a").copyfrom(A_np) - B_data = aot_executor.get_input("b").copyfrom(B_np) + aot_executor.get_input("a").copyfrom(a_np) + b_data = aot_executor.get_input("b").copyfrom(b_np) aot_executor.run() out = aot_executor.get_output(0) assert (out.numpy() == np.array([6, 10])).all() - B_np_new = np.array([[5, 8]]) - aot_executor.set_input("b", B_np_new) - assert (B_data.numpy() == B_np_new).all() + b_np_new = np.array([[5, 8]]) + aot_executor.set_input("b", b_np_new) + assert (b_data.numpy() == b_np_new).all() with _make_session(temp_dir, factory) as sess: do_test() @@ -233,10 +216,7 @@ def do_test(): def test_aot_executor_usmp_const_pool(): """Test the AOT executor with microTVM using USMP to generate a constant data pool.""" - ws_root = pathlib.Path(os.path.dirname(__file__) + "/micro-workspace-usmp") - if ws_root.exists(): - shutil.rmtree(ws_root) - temp_dir = tvm.contrib.utils.tempdir(ws_root.resolve()) + temp_dir = tvm.contrib.utils.tempdir() relay_mod = tvm.parser.fromtext( """ #[version = "0.0.5"] @@ -251,9 +231,8 @@ def @main(%a : Tensor[(1, 2), uint8], %b : Tensor[(1, 2), uint8], %c : Tensor[(1 executor = Executor("aot") main_func = relay_mod["main"] type_dict = {p.name_hint: p.checked_type.dtype for p in main_func.params} - B_np = np.array([[4, 7]], dtype="uint8").astype(type_dict["b"]) - C_np = np.array([[8, 9]], dtype="uint8").astype(type_dict["c"]) - params = {"c": C_np} + c_np = np.array([[8, 9]], dtype="uint8").astype(type_dict["c"]) + params = {"c": c_np} with tvm.transform.PassContext( opt_level=3, config={"tir.disable_vectorize": True, "tir.usmp.enable": True} ): @@ -272,8 +251,8 @@ def do_test(): sess.get_system_lib(), sess.device, "default" ) ) - except tvm._ffi.base.TVMError as e: - raise e + except tvm._ffi.base.TVMError as excpt: + raise excpt assert aot_executor.get_input_index("a") == 0 assert aot_executor.get_input_index("b") == 1 @@ -281,19 +260,19 @@ def do_test(): assert aot_executor.get_num_inputs() == 2 assert aot_executor.get_num_outputs() == 1 - A_np = np.array([[2, 3]], dtype="uint8") - B_np = np.array([[4, 7]], dtype="uint8") + a_np = np.array([[2, 3]], dtype="uint8") + b_np = np.array([[4, 7]], dtype="uint8") - A_data = aot_executor.get_input("a").copyfrom(A_np) - B_data = aot_executor.get_input("b").copyfrom(B_np) + a_data = aot_executor.get_input("a").copyfrom(a_np) + b_data = aot_executor.get_input("b").copyfrom(b_np) aot_executor.run() out = aot_executor.get_output(0) assert (out.numpy() == np.array([14, 19])).all() - B_np_new = np.array([[5, 8]]) - aot_executor.set_input("b", B_np_new) - assert (B_data.numpy() == B_np_new).all() + b_np_new = np.array([[5, 8]]) + aot_executor.set_input("b", b_np_new) + assert (b_data.numpy() == b_np_new).all() with _make_session(temp_dir, factory) as sess: do_test() @@ -302,53 +281,51 @@ def do_test(): @tvm.testing.requires_micro def test_std_math_functions(): """Verify that standard math functions can be used.""" - import tvm.micro temp_dir = tvm.contrib.utils.tempdir() with _make_add_sess(temp_dir) as sess: - A_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device) - assert (A_data.numpy() == np.array([2, 3])).all() - B_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device) - assert (B_data.numpy() == np.array([4])).all() - C_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device) - assert (C_data.numpy() == np.array([0, 0])).all() + a_data = tvm.nd.array(np.array([2, 3], dtype="int8"), device=sess.device) + assert (a_data.numpy() == np.array([2, 3])).all() + b_data = tvm.nd.array(np.array([4], dtype="int8"), device=sess.device) + assert (b_data.numpy() == np.array([4])).all() + c_data = tvm.nd.array(np.array([0, 0], dtype="int8"), device=sess.device) + assert (c_data.numpy() == np.array([0, 0])).all() system_lib = sess.get_system_lib() - system_lib.get_function("add")(A_data, B_data, C_data) + system_lib.get_function("add")(a_data, b_data, c_data) temp_dir = tvm.contrib.utils.tempdir() - A = tvm.te.placeholder((2,), dtype="float32", name="A") - B = tvm.te.compute(A.shape, lambda i: tvm.te.exp(A[i]), name="B") - s = tvm.te.create_schedule(B.op) + a = tvm.te.placeholder((2,), dtype="float32", name="a") + b = tvm.te.compute(a.shape, lambda i: tvm.te.exp(a[i]), name="b") + s = tvm.te.create_schedule(b.op) - with _make_sess_from_op(temp_dir, "myexpf", s, [A, B]) as sess: - A_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) - B_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) + with _make_sess_from_op(temp_dir, "myexpf", s, [a, b]) as sess: + a_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) + b_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) lib = sess.get_system_lib() func = lib["myexpf"] - func(A_data, B_data) - np.testing.assert_allclose(B_data.numpy(), np.array([7.389056, 20.085537])) + func(a_data, b_data) + np.testing.assert_allclose(b_data.numpy(), np.array([7.389056, 20.085537])) @tvm.testing.requires_micro def test_platform_timer(): """Verify the platform timer can be used to time remote functions.""" - import tvm.micro temp_dir = tvm.contrib.utils.tempdir() - A = tvm.te.placeholder((2,), dtype="float32", name="A") - B = tvm.te.compute(A.shape, lambda i: tvm.te.exp(A[i]), name="B") - s = tvm.te.create_schedule(B.op) + a = tvm.te.placeholder((2,), dtype="float32", name="a") + b = tvm.te.compute(a.shape, lambda i: tvm.te.exp(a[i]), name="b") + s = tvm.te.create_schedule(b.op) - with _make_sess_from_op(temp_dir, "myexpf", s, [A, B]) as sess: - A_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) - B_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) + with _make_sess_from_op(temp_dir, "myexpf", s, [a, b]) as sess: + a_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) + b_data = tvm.nd.array(np.array([2.0, 3.0], dtype="float32"), device=sess.device) lib = sess.get_system_lib() time_eval_f = lib.time_evaluator( "myexpf", sess.device, number=2000, repeat=3, min_repeat_ms=40 ) - result = time_eval_f(A_data, B_data) + result = time_eval_f(a_data, b_data) assert result.mean > 0 assert len(result.results) == 3 @@ -356,14 +333,12 @@ def test_platform_timer(): @tvm.testing.requires_micro def test_autotune(): """Verify that autotune works with micro.""" - import tvm.relay as relay - from tvm.micro.testing.utils import check_tune_log runtime = Runtime("crt", {"system-lib": True}) - data = relay.var("data", relay.TensorType((1, 3, 64, 64), "float32")) - weight = relay.var("weight", relay.TensorType((8, 3, 5, 5), "float32")) - y = relay.nn.conv2d( + data = tvm.relay.var("data", tvm.relay.TensorType((1, 3, 64, 64), "float32")) + weight = tvm.relay.var("weight", tvm.relay.TensorType((8, 3, 5, 5), "float32")) + y = tvm.relay.nn.conv2d( data, weight, padding=(2, 2), @@ -371,9 +346,9 @@ def test_autotune(): kernel_layout="OIHW", out_dtype="float32", ) - f = relay.Function([data, weight], y) + f = tvm.relay.Function([data, weight], y) mod = tvm.IRModule.from_expr(f) - mod = relay.transform.InferType()(mod) + mod = tvm.relay.transform.InferType()(mod) main_func = mod["main"] shape_dict = {p.name_hint: p.checked_type.concrete_shape for p in main_func.params}