diff --git a/python/tvm/relay/frontend/onnx.py b/python/tvm/relay/frontend/onnx.py index ffd31317e9f5..98c9a34b4089 100644 --- a/python/tvm/relay/frontend/onnx.py +++ b/python/tvm/relay/frontend/onnx.py @@ -5582,6 +5582,36 @@ def _impl_v16(cls, inputs, attr, params): ) +class Bernoulli(OnnxOpConverter): + """Operator converter for Bernoulli""" + + @classmethod + def _impl_v15(cls, inputs, attr, params): + in_dtype = infer_type(inputs[0]).checked_type.dtype + assert in_dtype in [ + "float32", + "float64", + ], "Only float input tensor is currently supported." + # The data type for the elements of the output tensor. + # if not specified, we will use the data type of the input tensor + out_dtype = attr.get("dtype", None) + if out_dtype is None: + out_dtype = in_dtype + else: + out_dtype = get_type(out_dtype) + + seed = attr.get("seed", None) + if seed is None: + seed = np.random.randint(1e6) + else: + seed = int(seed) + + key = _random.threefry_key(seed) + inter_outputs = _op.random.uniform(key, infer_shape(inputs[0]), in_dtype) + _, uniform_nums = _expr.TupleWrapper(inter_outputs, 2) + return _op.cast(_op.less(uniform_nums, inputs[0]), out_dtype) + + class RandomNormal(OnnxOpConverter): """Operator converter for random_normal""" @@ -6348,6 +6378,7 @@ def _get_convert_map(opset): "QLinearGlobalAveragePool": QLinearGlobalAveragePool.get_converter(opset), "QLinearLeakyRelu": QLinearLeakyRelu.get_converter(opset), # Random number generation. + "Bernoulli": Bernoulli.get_converter(opset), "RandomNormal": RandomNormal.get_converter(opset), "RandomNormalLike": RandomNormalLike.get_converter(opset), "RandomUniform": RandomUniform.get_converter(opset), diff --git a/tests/python/frontend/onnx/test_forward.py b/tests/python/frontend/onnx/test_forward.py index f5b5f7c65cb5..559587e2a597 100644 --- a/tests/python/frontend/onnx/test_forward.py +++ b/tests/python/frontend/onnx/test_forward.py @@ -6707,6 +6707,165 @@ def verify_qlinearsigmoid(a_shape): verify_qlinearsigmoid([]) +@tvm.testing.parametrize_targets("llvm") +def test_random_bernoulli(target, dev): + """test_random_bernoulli""" + + def _get_tvm_output( + inputs, + out_dtype="int32", + seed=None, + target=target, + dev=dev, + use_vm=False, + freeze_params=False, + ): + def get_bernoulli_model(shape, in_dtype="float32", out_dtype="int32", seed=None): + onnx_itype = mapping.NP_TYPE_TO_TENSOR_TYPE[np.dtype(in_dtype)] + onnx_otype = mapping.NP_TYPE_TO_TENSOR_TYPE[np.dtype(out_dtype)] + node = helper.make_node( + "Bernoulli", + ["input"], + ["output"], + ) + dtype_attr = helper.make_attribute("dtype", onnx_otype) + node.attribute.append(dtype_attr) + if seed is not None: + seed_attr = helper.make_attribute("seed", float(seed)) + node.attribute.append(seed_attr) + + graph = helper.make_graph( + [node], + "random_bernoulli_test", + inputs=[helper.make_tensor_value_info("input", onnx_itype, list(shape))], + outputs=[helper.make_tensor_value_info("output", onnx_otype, list(shape))], + ) + return helper.make_model(graph, producer_name="random_bernoulli_test") + + shape = inputs.shape + in_dtype = inputs.dtype + model = get_bernoulli_model(shape, in_dtype, out_dtype, seed) + + if use_vm: + return get_tvm_output_with_vm( + model, + inputs, + target, + dev, + freeze_params=freeze_params, + ) + else: + return get_tvm_output( + model, + inputs, + target, + dev, + ) + + def binom_test(input, ideal_mean, threshold=0.05): + # This test is strictly appropriate when input probabilities are all identical. + # In that case, it should lead to flaky failures in only one run in a million (p>=1e-6). + # The test should be over-conservative when input probabilities are not identical. + # (i.e., It should have a rate of flaky failures lower than one run in a million.) + # If this test starts repeatedly throwing flaky failures, consult a statistician + # in addition to your regular debugging. + bnm_test_res = scipy.stats.binomtest( + k=np.sum(input, dtype="int32"), n=len(input), p=ideal_mean + ) + return bnm_test_res.pvalue > threshold + + def verify_bernoulli( + inputs=None, + shape=[], + in_dtype="float32", + out_dtype="int32", + seed=None, + target=target, + dev=dev, + use_vm=False, + freeze_params=False, + in_out_equal=False, + ): + if inputs is None: + assert len(shape) != 0 + inputs = np.random.uniform(size=shape).astype(in_dtype) + + tvm_out = _get_tvm_output( + inputs, + out_dtype, + seed, + target, + dev, + use_vm, + freeze_params, + ) + + if isinstance(tvm_out, list): + tvm_out = tvm_out[0] + # check that values are 0 or 1 + tvm_flat = tvm_out.flatten() + assert np.array_equal(tvm_flat, tvm_flat.astype("bool")) + if in_out_equal: + tvm.testing.assert_allclose(inputs, tvm_out) + else: + # check that mean value is close to the theoretical one by binomial test + ideal_mean = np.mean(inputs) + repeats = 3 + check = False + for i in range(repeats): + if binom_test(tvm_flat, ideal_mean): + check = True + break + else: + # repeat with new seed + seed = np.random.randint(1e6) + tvm_flat = _get_tvm_output( + inputs, + out_dtype, + seed, + target, + dev, + use_vm, + freeze_params, + ).flatten() + assert check, "Binomial test failed" + + # Test input sequence of 0 and 1 + inputs = np.random.randint(2, size=[10000]).astype("float32") + verify_bernoulli(inputs, in_out_equal=True) + + # Binomial test input with 0.5 values + val_num = 10000 + inputs = np.ones([val_num], dtype="float32") * 0.5 + verify_bernoulli(inputs) + + # Binomial test input with 0.1 values + inputs = np.ones([val_num], dtype="float32") * 0.1 + verify_bernoulli(inputs) + + # Simple test + verify_bernoulli(shape=[val_num]) + + # Floating output type + verify_bernoulli(shape=[val_num], out_dtype="float32") + + # Double input type + verify_bernoulli(shape=[val_num], in_dtype="float64") + + # Test N-D tensor generation + verify_bernoulli(shape=[2, 4, 100, 100]) + + # Test with seed + verify_bernoulli(shape=[val_num], seed=np.random.randint(1e6)) + + # Test result determinism with the same seeds + inputs = np.random.uniform(size=[val_num]) + fixed_seed = np.random.randint(1e6) + tvm_out_1 = _get_tvm_output(inputs, seed=fixed_seed) + tvm_out_2 = _get_tvm_output(inputs, seed=fixed_seed) + tvm.testing.assert_allclose(tvm_out_1, tvm_out_2) + + @tvm.testing.parametrize_targets("llvm") def test_random_uniform(target, dev): """test_random_uniform"""