-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
A clone of test_runtime_micro.py, however modified to run specificall…
…y on ARM hardware, currently just the STM32F746 discovery board. Signed-off-by: Tom Gall <[email protected]>
- Loading branch information
Showing
1 changed file
with
385 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,385 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
import os | ||
|
||
import numpy as np | ||
import tvm | ||
from tvm import te | ||
from tvm.contrib import graph_runtime, util | ||
from tvm import relay | ||
import tvm.micro as micro | ||
from tvm.micro import create_micro_mod | ||
from tvm.relay.testing import resnet | ||
|
||
# Use real micro device - an STM32F746 discovery board | ||
# SETUP: | ||
# Be sure to have openocd installed and running | ||
# Ex : openocd -f board/stm32f7discovery.cfg | ||
# Be sure to have the ST CMSIS library downloaded, installed and | ||
# Ex : export CMSIS_ST_PATH="/home/yourid/st/STM32Cube_FW_F7_V1.16.0/Drivers/CMSIS" | ||
DEV_CONFIG_A = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666) | ||
DEV_CONFIG_B = micro.device.arm.stm32f746xx.generate_config("127.0.0.1", 6666) | ||
TARGET = 'c -device=micro_dev' | ||
|
||
def relay_micro_build(func, dev_config, params=None): | ||
"""Create a graph runtime module with a micro device context from a Relay function. | ||
Parameters | ||
---------- | ||
func : relay.Function | ||
function to compile | ||
dev_config : Dict[str, Any] | ||
MicroTVM config dict for the target device | ||
params : dict | ||
input parameters that do not change during inference | ||
Return | ||
------ | ||
mod : tvm.runtime.Module | ||
graph runtime module for the target device | ||
""" | ||
disable_vectorize = tvm.target.build_config(disable_vectorize=True) | ||
disable_fusion = relay.build_config(disabled_pass={'FuseOps'}) | ||
with disable_vectorize, disable_fusion: | ||
graph, c_mod, params = relay.build(func, target=TARGET, params=params) | ||
micro_mod = micro.create_micro_mod(c_mod, dev_config) | ||
ctx = tvm.micro_dev(0) | ||
mod = graph_runtime.create(graph, micro_mod, ctx) | ||
mod.set_input(**params) | ||
return mod | ||
|
||
|
||
GDB_INIT_TEMPLATE = """ | ||
layout asm | ||
target remote localhost:{gdb_port} | ||
set $pc = UTVMInit | ||
break UTVMDone | ||
""" | ||
|
||
|
||
def reset_gdbinit(): | ||
if 'server_port' not in DEV_CONFIG_A: | ||
return | ||
try: | ||
gdb_init_dir = os.environ['MICRO_GDB_INIT_DIR'] | ||
except KeyError: | ||
return | ||
with open(f'{gdb_init_dir}/.gdbinit', 'w') as f: | ||
gdb_port = DEV_CONFIG_A['server_port'] - 3333 | ||
f.write(GDB_INIT_TEMPLATE.format(gdb_port=gdb_port)) | ||
|
||
|
||
def test_alloc(): | ||
"""Test tensor allocation on the device.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
with micro.Session(DEV_CONFIG_A): | ||
ctx = tvm.micro_dev(0) | ||
np_tensor = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor = tvm.nd.array(np_tensor, ctx) | ||
tvm.testing.assert_allclose(np_tensor, micro_tensor.asnumpy()) | ||
|
||
|
||
def test_add(): | ||
"""Test a module which performs addition.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
reset_gdbinit() | ||
|
||
# Construct TVM expression. | ||
tvm_shape = tvm.runtime.convert(shape) | ||
A = te.placeholder(tvm_shape, name="A", dtype=dtype) | ||
B = te.placeholder(tvm_shape, name="B", dtype=dtype) | ||
C = te.compute(A.shape, lambda *i: A(*i) + B(*i), name="C") | ||
s = te.create_schedule(C.op) | ||
|
||
func_name = "fadd" | ||
c_mod = tvm.build(s, [A, B, C], target="c", name=func_name) | ||
|
||
with micro.Session(DEV_CONFIG_A) as sess: | ||
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) | ||
micro_func = micro_mod[func_name] | ||
ctx = tvm.micro_dev(0) | ||
|
||
a_np = np.random.uniform(size=shape).astype(dtype) | ||
a = tvm.nd.array(a_np, ctx) | ||
b_np = np.random.uniform(size=shape).astype(dtype) | ||
b = tvm.nd.array(b_np, ctx) | ||
c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx) | ||
micro_func(a, b, c) | ||
|
||
# ensure inputs weren't corrupted | ||
tvm.testing.assert_allclose( | ||
a.asnumpy(), a_np) | ||
tvm.testing.assert_allclose( | ||
b.asnumpy(), b_np) | ||
# ensure output is correct | ||
tvm.testing.assert_allclose( | ||
c.asnumpy(), a.asnumpy() + b.asnumpy()) | ||
|
||
|
||
def test_workspace_add(): | ||
"""Test a module which uses a workspace to compute an intermediate value.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
reset_gdbinit() | ||
|
||
# Construct TVM expression. | ||
tvm_shape = tvm.runtime.convert(shape) | ||
A = te.placeholder(tvm_shape, name="A", dtype=dtype) | ||
B = te.placeholder(tvm_shape, name="B", dtype=dtype) | ||
B = te.compute(A.shape, lambda *i: A(*i) + 1, name="B") | ||
C = te.compute(A.shape, lambda *i: B(*i) + 1, name="C") | ||
s = te.create_schedule(C.op) | ||
|
||
func_name = "fadd_two_workspace" | ||
c_mod = tvm.build(s, [A, C], target="c", name=func_name) | ||
|
||
with micro.Session(DEV_CONFIG_A) as sess: | ||
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) | ||
micro_func = micro_mod[func_name] | ||
ctx = tvm.micro_dev(0) | ||
a_np = np.random.uniform(size=shape).astype(dtype) | ||
a = tvm.nd.array(a_np, ctx) | ||
c = tvm.nd.array(np.zeros(shape, dtype=dtype), ctx) | ||
micro_func(a, c) | ||
|
||
# ensure input wasn't corrupted | ||
tvm.testing.assert_allclose( | ||
a.asnumpy(), a_np) | ||
# ensure output is correct | ||
tvm.testing.assert_allclose( | ||
c.asnumpy(), a.asnumpy() + 2.0) | ||
|
||
|
||
def test_graph_runtime(): | ||
"""Test a program which uses the graph runtime.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
# Construct Relay program. | ||
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) | ||
xx = relay.multiply(x, x) | ||
z = relay.add(xx, relay.const(1.0)) | ||
func = relay.Function([x], z) | ||
|
||
with micro.Session(DEV_CONFIG_A): | ||
mod = relay_micro_build(func, DEV_CONFIG_A) | ||
|
||
x_in = np.random.uniform(size=shape[0]).astype(dtype) | ||
mod.run(x=x_in) | ||
result = mod.get_output(0).asnumpy() | ||
|
||
tvm.testing.assert_allclose( | ||
mod.get_input(0).asnumpy(), x_in) | ||
tvm.testing.assert_allclose( | ||
result, x_in * x_in + 1.0) | ||
|
||
|
||
def test_conv2d(): | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
|
||
from tvm.relay import create_executor | ||
from tvm.relay import transform | ||
|
||
dshape = (1, 4, 16, 16) | ||
dtype = 'int8' | ||
func_name = 'fused_nn_conv2d' | ||
|
||
reset_gdbinit() | ||
|
||
# Construct Relay program. | ||
x = relay.var("x", shape=dshape, dtype=dtype) | ||
conv_expr = relay.nn.conv2d( | ||
x, relay.var("w"), | ||
kernel_size=(3, 3), | ||
padding=(1, 1), | ||
channels=4) | ||
func = relay.Function(relay.analysis.free_vars(conv_expr), conv_expr) | ||
mod = tvm.IRModule.from_expr(func) | ||
mod = transform.InferType()(mod) | ||
|
||
x_shape = list(map(lambda x: x.value, mod['main'].params[0].checked_type.shape)) | ||
w_shape = list(map(lambda x: x.value, mod['main'].params[1].checked_type.shape)) | ||
out_shape = list(map(lambda x: x.value, mod['main'].ret_type.shape)) | ||
|
||
with tvm.target.build_config(disable_vectorize=True): | ||
graph, c_mod, params = relay.build(mod, target="c") | ||
|
||
with micro.Session(DEV_CONFIG_A): | ||
micro_mod = micro.create_micro_mod(c_mod, DEV_CONFIG_A) | ||
candidate_func_name = func_name | ||
for i in range(100): | ||
try: | ||
micro_func = micro_mod[candidate_func_name] | ||
break | ||
except tvm.TVMError as e: | ||
candidate_func_name = f'{func_name}_{i}' | ||
else: | ||
assert False | ||
ctx = tvm.micro_dev(0) | ||
|
||
x_data = tvm.nd.array(np.random.uniform(size=x_shape).astype(dtype), ctx) | ||
w_data = tvm.nd.array(np.random.uniform(size=w_shape).astype(dtype), ctx) | ||
result = tvm.nd.array(np.zeros(shape=out_shape, dtype=dtype), ctx) | ||
micro_func(x_data, w_data, result) | ||
|
||
out_data = np.zeros(out_shape, dtype=dtype) | ||
params = { 'x': x_data.asnumpy(), 'w': w_data.asnumpy() } | ||
intrp = create_executor('debug') | ||
expected_result = intrp.evaluate(mod['main'])(x_data, w_data) | ||
|
||
tvm.testing.assert_allclose(result.asnumpy(), expected_result.asnumpy()) | ||
|
||
|
||
def test_interleave_sessions(): | ||
"""Test closing and reopening sessions.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
# Construct Relay add program. | ||
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) | ||
ret = relay.add(x, relay.const(1.0)) | ||
add_const_func = relay.Function([x], ret) | ||
|
||
sess_a = micro.Session(DEV_CONFIG_A) | ||
sess_b = micro.Session(DEV_CONFIG_B) | ||
with sess_a: | ||
np_tensor_a = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) | ||
with sess_b: | ||
np_tensor_b = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0)) | ||
with sess_a: | ||
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) | ||
add_const_mod.run(x=micro_tensor_a) | ||
add_result = add_const_mod.get_output(0).asnumpy() | ||
tvm.testing.assert_allclose( | ||
add_result, np_tensor_a + 1.0) | ||
with sess_b: | ||
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_B) | ||
add_const_mod.run(x=micro_tensor_b) | ||
add_result = add_const_mod.get_output(0).asnumpy() | ||
tvm.testing.assert_allclose( | ||
add_result, np_tensor_b + 1.0) | ||
|
||
|
||
def test_nested_sessions(): | ||
"""Test entering and exiting nested session contexts.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
# Construct Relay add program. | ||
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) | ||
ret = relay.add(x, relay.const(1.0)) | ||
add_const_func = relay.Function([x], ret) | ||
|
||
sess_a = micro.Session(DEV_CONFIG_A) | ||
sess_b = micro.Session(DEV_CONFIG_B) | ||
with sess_a: | ||
np_tensor_a = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) | ||
with sess_b: | ||
np_tensor_b = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor_b = tvm.nd.array(np_tensor_b, tvm.micro_dev(0)) | ||
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) | ||
add_const_mod.run(x=micro_tensor_a) | ||
add_result = add_const_mod.get_output(0).asnumpy() | ||
tvm.testing.assert_allclose( | ||
add_result, np_tensor_a + 1.0) | ||
|
||
|
||
def test_inactive_session_use(): | ||
"""Test the use of objects allocated in a session that is no longer active.""" | ||
if not tvm.runtime.enabled("micro_dev"): | ||
return | ||
shape = (1024,) | ||
dtype = "float32" | ||
|
||
# Construct Relay add program. | ||
x = relay.var("x", relay.TensorType(shape=shape, dtype=dtype)) | ||
ret = relay.add(x, relay.const(1.0)) | ||
add_const_func = relay.Function([x], ret) | ||
|
||
sess_a = micro.Session(DEV_CONFIG_A) | ||
sess_b = micro.Session(DEV_CONFIG_B) | ||
with sess_a: | ||
np_tensor_a = np.random.uniform(size=shape).astype(dtype) | ||
micro_tensor_a = tvm.nd.array(np_tensor_a, tvm.micro_dev(0)) | ||
add_const_mod = relay_micro_build(add_const_func, DEV_CONFIG_A) | ||
|
||
with sess_b: | ||
# These objects belong to `sess_a`. | ||
add_const_mod.run(x=micro_tensor_a) | ||
add_result = add_const_mod.get_output(0).asnumpy() | ||
tvm.testing.assert_allclose( | ||
add_result, np_tensor_a + 1.0) | ||
|
||
|
||
# TODO add workspace alloc/free stress test | ||
|
||
if __name__ == "__main__": | ||
test_alloc() | ||
print() | ||
print('finished alloc test') | ||
input('[press enter to continue]') | ||
test_add() | ||
print() | ||
print('finished add test') | ||
input('[press enter to continue]') | ||
test_workspace_add() | ||
print() | ||
print('finished workspace add test') | ||
input('[press enter to continue]') | ||
test_graph_runtime() | ||
print() | ||
print('finished graph runtime test') | ||
input('[press enter to continue]') | ||
test_conv2d() | ||
print() | ||
print('finished conv2d test') | ||
input('[press enter to continue]') | ||
# disable for now as these are currently broken | ||
#test_interleave_sessions() | ||
#print() | ||
#print('finished interleaved sessions test') | ||
#input('[press enter to continue]') | ||
# test_nested_sessions() | ||
#print() | ||
#print('finished nested sessions test') | ||
#input('[press enter to continue]') | ||
test_inactive_session_use() | ||
print() | ||
print('finished use inactive session test') | ||
input('[press enter to continue]') |