Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ONNX] Support Bernoulli op on ONNX front-end #13802

Merged
merged 19 commits into from
Jan 27, 2023
Merged
31 changes: 31 additions & 0 deletions python/tvm/relay/frontend/onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5582,6 +5582,36 @@ def _impl_v16(cls, inputs, attr, params):
)


class Bernoulli(OnnxOpConverter):
"""Operator converter for Bernoulli"""

@classmethod
def _impl_v15(cls, inputs, attr, params):
in_dtype = infer_type(inputs[0]).checked_type.dtype
assert in_dtype in [
"float32",
"float64",
], "Only float input tensor is currently supported."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ONNX has support for float16. However, this type is not supported here. Maybe it's worth pointing out the reason (TODO) why this data type is not currently supported at the ONNX front-end level?

# The data type for the elements of the output tensor.
# if not specified, we will use the data type of the input tensor
out_dtype = attr.get("dtype", None)
if out_dtype is None:
out_dtype = in_dtype
else:
out_dtype = get_type(out_dtype)

seed = attr.get("seed", None)
if seed is None:
seed = np.random.randint(1e6)
else:
seed = int(seed)

key = _random.threefry_key(seed)
inter_outputs = _op.random.uniform(key, infer_shape(inputs[0]), in_dtype)
_, uniform_nums = _expr.TupleWrapper(inter_outputs, 2)
return _op.cast(_op.less(uniform_nums, inputs[0]), out_dtype)


class RandomNormal(OnnxOpConverter):
"""Operator converter for random_normal"""

Expand Down Expand Up @@ -6348,6 +6378,7 @@ def _get_convert_map(opset):
"QLinearGlobalAveragePool": QLinearGlobalAveragePool.get_converter(opset),
"QLinearLeakyRelu": QLinearLeakyRelu.get_converter(opset),
# Random number generation.
"Bernoulli": Bernoulli.get_converter(opset),
"RandomNormal": RandomNormal.get_converter(opset),
"RandomNormalLike": RandomNormalLike.get_converter(opset),
"RandomUniform": RandomUniform.get_converter(opset),
Expand Down
159 changes: 159 additions & 0 deletions tests/python/frontend/onnx/test_forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -6707,6 +6707,165 @@ def verify_qlinearsigmoid(a_shape):
verify_qlinearsigmoid([])


@tvm.testing.parametrize_targets("llvm")
def test_random_bernoulli(target, dev):
"""test_random_bernoulli"""

def _get_tvm_output(
inputs,
out_dtype="int32",
seed=None,
target=target,
dev=dev,
use_vm=False,
freeze_params=False,
):
def get_bernoulli_model(shape, in_dtype="float32", out_dtype="int32", seed=None):
onnx_itype = mapping.NP_TYPE_TO_TENSOR_TYPE[np.dtype(in_dtype)]
onnx_otype = mapping.NP_TYPE_TO_TENSOR_TYPE[np.dtype(out_dtype)]
node = helper.make_node(
"Bernoulli",
["input"],
["output"],
)
dtype_attr = helper.make_attribute("dtype", onnx_otype)
node.attribute.append(dtype_attr)
if seed is not None:
seed_attr = helper.make_attribute("seed", float(seed))
node.attribute.append(seed_attr)

graph = helper.make_graph(
[node],
"random_bernoulli_test",
inputs=[helper.make_tensor_value_info("input", onnx_itype, list(shape))],
outputs=[helper.make_tensor_value_info("output", onnx_otype, list(shape))],
)
return helper.make_model(graph, producer_name="random_bernoulli_test")

shape = inputs.shape
in_dtype = inputs.dtype
model = get_bernoulli_model(shape, in_dtype, out_dtype, seed)

if use_vm:
return get_tvm_output_with_vm(
model,
inputs,
target,
dev,
freeze_params=freeze_params,
)
else:
return get_tvm_output(
model,
inputs,
target,
dev,
)

def binom_test(input, ideal_mean, threshold=0.05):
# This test is strictly appropriate when input probabilities are all identical.
# In that case, it should lead to flaky failures in only one run in a million (p>=1e-6).
# The test should be over-conservative when input probabilities are not identical.
# (i.e., It should have a rate of flaky failures lower than one run in a million.)
# If this test starts repeatedly throwing flaky failures, consult a statistician
# in addition to your regular debugging.
bnm_test_res = scipy.stats.binomtest(
k=np.sum(input, dtype="int32"), n=len(input), p=ideal_mean
)
return bnm_test_res.pvalue > threshold

def verify_bernoulli(
inputs=None,
shape=[],
in_dtype="float32",
out_dtype="int32",
seed=None,
target=target,
dev=dev,
use_vm=False,
freeze_params=False,
in_out_equal=False,
):
if inputs is None:
assert len(shape) != 0
inputs = np.random.uniform(size=shape).astype(in_dtype)

tvm_out = _get_tvm_output(
inputs,
out_dtype,
seed,
target,
dev,
use_vm,
freeze_params,
)

if isinstance(tvm_out, list):
tvm_out = tvm_out[0]
# check that values are 0 or 1
tvm_flat = tvm_out.flatten()
assert np.array_equal(tvm_flat, tvm_flat.astype("bool"))
if in_out_equal:
tvm.testing.assert_allclose(inputs, tvm_out)
else:
# check that mean value is close to the theoretical one by binomial test
ideal_mean = np.mean(inputs)
repeats = 3
check = False
for i in range(repeats):
if binom_test(tvm_flat, ideal_mean):
check = True
break
else:
# repeat with new seed
seed = np.random.randint(1e6)
tvm_flat = _get_tvm_output(
inputs,
out_dtype,
seed,
target,
dev,
use_vm,
freeze_params,
).flatten()
assert check, "Binomial test failed"

# Test input sequence of 0 and 1
inputs = np.random.randint(2, size=[10000]).astype("float32")
verify_bernoulli(inputs, in_out_equal=True)

# Binomial test input with 0.5 values
val_num = 10000
inputs = np.ones([val_num], dtype="float32") * 0.5
verify_bernoulli(inputs)

# Binomial test input with 0.1 values
inputs = np.ones([val_num], dtype="float32") * 0.1
verify_bernoulli(inputs)

# Simple test
verify_bernoulli(shape=[val_num])

# Floating output type
verify_bernoulli(shape=[val_num], out_dtype="float32")

# Double input type
verify_bernoulli(shape=[val_num], in_dtype="float64")

# Test N-D tensor generation
verify_bernoulli(shape=[2, 4, 100, 100])

# Test with seed
verify_bernoulli(shape=[val_num], seed=np.random.randint(1e6))

# Test result determinism with the same seeds
inputs = np.random.uniform(size=[val_num])
fixed_seed = np.random.randint(1e6)
tvm_out_1 = _get_tvm_output(inputs, seed=fixed_seed)
tvm_out_2 = _get_tvm_output(inputs, seed=fixed_seed)
tvm.testing.assert_allclose(tvm_out_1, tvm_out_2)


@tvm.testing.parametrize_targets("llvm")
def test_random_uniform(target, dev):
"""test_random_uniform"""
Expand Down