From 6f4c9919019926f0f042efa91b9b112d11ebaa61 Mon Sep 17 00:00:00 2001 From: David Li Date: Thu, 4 Nov 2021 16:48:14 +0100 Subject: [PATCH] ARROW-13130: [C++] Add decimal support to arithmetic kernels This adds decimal support for the following kernels (and _checked variants where applicable): abs, acos, add/sub/mul/div, asin, atan, atan2, ceil, cos, floor, hash_stddev, hash_tdigest, hash_variance, is_finite, is_inf, is_nan, ln, log1p, log2, logb, mode, negate, power, quantile, round, round_to_multiple, sign, sin, stddev/variance, tan, tdigest, trunc Most kernels cast decimals to double and proceed. Some, including rounding, directly operate on decimals. Aggregate kernels directly operate on decimals (and cast to double inline) since DispatchBest is not usable for the aggregate nodes (at least, unless we also reimplement implicit casting there). Additionally, ValidateFull for scalars/arrays now checks FitsInPrecision. A number of tests were adjusted to account for this. Closes #11313 from lidavidm/arrow-13130 Authored-by: David Li Signed-off-by: Antoine Pitrou --- .../arrow/compute/kernels/aggregate_basic.cc | 5 +- .../compute/kernels/aggregate_internal.h | 21 ++ .../arrow/compute/kernels/aggregate_mode.cc | 95 +++-- .../compute/kernels/aggregate_quantile.cc | 50 ++- .../compute/kernels/aggregate_tdigest.cc | 36 +- .../arrow/compute/kernels/aggregate_test.cc | 193 +++++++++-- .../compute/kernels/aggregate_var_std.cc | 53 ++- .../arrow/compute/kernels/hash_aggregate.cc | 303 ++++++++-------- .../compute/kernels/hash_aggregate_test.cc | 91 +++++ .../compute/kernels/scalar_arithmetic.cc | 124 +++++-- .../compute/kernels/scalar_arithmetic_test.cc | 327 +++++++++++++++++- .../arrow/compute/kernels/scalar_validity.cc | 39 +++ .../compute/kernels/scalar_validity_test.cc | 39 +++ cpp/src/arrow/util/basic_decimal.cc | 12 + cpp/src/arrow/util/basic_decimal.h | 6 + docs/source/cpp/compute.rst | 162 +++++---- 16 files changed, 1197 insertions(+), 359 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 0bc839be8e3cc..53fa2d2d7f702 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func, auto init = [min_max_func]( KernelContext* ctx, const KernelInitArgs& args) -> Result> { - std::vector inputs = args.inputs; - ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs)); - KernelInitArgs new_args{kernel, inputs, args.options}; + ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs)); + KernelInitArgs new_args{kernel, args.inputs, args.options}; return kernel->init(ctx, new_args); }; diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h index 22a54558f4e8a..946ec01900c5b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h @@ -21,6 +21,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_run_reader.h" +#include "arrow/util/int128_internal.h" #include "arrow/util/logging.h" namespace arrow { @@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFinalize finalize, ScalarAggregateFunction* func, SimdLevel::type simd_level = SimdLevel::NONE); +using arrow::internal::VisitSetBitRunsVoid; + +template +struct GetSumType; + +template +struct GetSumType> { + using SumType = double; +}; + +template +struct GetSumType> { + using SumType = arrow::internal::int128_t; +}; + +template +struct GetSumType> { + using SumType = typename TypeTraits::CType; +}; + // SumArray must be parameterized with the SIMD level since it's called both from // translation units with and without vectorization. Normally it gets inlined but // if not, without the parameter, we'll have multiple definitions of the same diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index f225f6bf569c3..6a50556a13efd 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count"; constexpr uint64_t kCountEOF = ~0ULL; -template +template ::CType> Result> PrepareOutput(int64_t n, KernelContext* ctx, Datum* out) { - const auto& mode_type = TypeTraits::type_singleton(); + DCHECK_EQ(Type::STRUCT, out->type()->id()); + const auto& out_type = checked_cast(*out->type()); + DCHECK_EQ(2, out_type.num_fields()); + const auto& mode_type = out_type.field(0)->type(); const auto& count_type = int64(); auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0); @@ -61,10 +64,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx, count_buffer = count_data->template GetMutableValues(1); } - const auto& out_type = - struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)}); - *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0)); - + *out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0)); return std::make_pair(mode_buffer, count_buffer); } @@ -72,7 +72,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx, // suboptimal for tiny or large n, possibly okay as we're not in hot path template Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) { - using CType = typename InType::c_type; + using CType = typename TypeTraits::CType; using ValueCountPair = std::pair; auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) { @@ -203,13 +203,25 @@ struct CountModer { } }; -// copy and sort approach for floating points or integers with wide value range +// copy and sort approach for floating points, decimals, or integers with wide +// value range // O(n) space, O(nlogn) time template struct SortModer { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; using Allocator = arrow::stl::allocator; + template + static enable_if_floating_point GetNan() { + return static_cast(NAN); + } + + template + static enable_if_t::value, CType> GetNan() { + DCHECK(false); + return static_cast(0); + } + Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const Datum& datum = batch[0]; const int64_t in_length = datum.length() - datum.null_count(); @@ -246,7 +258,7 @@ struct SortModer { if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) { // handle NAN at last if (nan_count > 0) { - auto value_count = std::make_pair(static_cast(NAN), nan_count); + auto value_count = std::make_pair(GetNan(), nan_count); nan_count = 0; return value_count; } @@ -318,13 +330,18 @@ struct Moder::value && }; template -struct Moder::value>> { +struct Moder> { + SortModer impl; +}; + +template +struct Moder> { SortModer impl; }; template Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; const ModeOptions& options = ModeState::Get(ctx); if ((!options.skip_nulls && !scalar.is_valid) || @@ -366,30 +383,33 @@ struct ModeExecutor { } }; -VectorKernel NewModeKernel(const std::shared_ptr& in_type) { +Result ModeType(KernelContext*, const std::vector& descrs) { + return ValueDescr::Array( + struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())})); +} + +VectorKernel NewModeKernel(const std::shared_ptr& in_type, + ArrayKernelExec exec) { VectorKernel kernel; kernel.init = ModeState::Init; kernel.can_execute_chunkwise = false; kernel.output_chunked = false; - auto out_type = - struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())}); - kernel.signature = - KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type)); - return kernel; -} - -void AddBooleanModeKernel(VectorFunction* func) { - VectorKernel kernel = NewModeKernel(boolean()); - kernel.exec = ModeExecutor::Exec; - DCHECK_OK(func->AddKernel(kernel)); -} - -void AddNumericModeKernels(VectorFunction* func) { - for (const auto& type : NumericTypes()) { - VectorKernel kernel = NewModeKernel(type); - kernel.exec = GenerateNumeric(*type); - DCHECK_OK(func->AddKernel(kernel)); + switch (in_type->id()) { + case Type::DECIMAL128: + case Type::DECIMAL256: + kernel.signature = + KernelSignature::Make({InputType(in_type->id())}, OutputType(ModeType)); + break; + default: { + auto out_type = + struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())}); + kernel.signature = KernelSignature::Make({InputType(in_type->id())}, + ValueDescr::Array(std::move(out_type))); + break; + } } + kernel.exec = std::move(exec); + return kernel; } const FunctionDoc mode_doc{ @@ -409,8 +429,17 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) { static auto default_options = ModeOptions::Defaults(); auto func = std::make_shared("mode", Arity::Unary(), &mode_doc, &default_options); - AddBooleanModeKernel(func.get()); - AddNumericModeKernels(func.get()); + DCHECK_OK(func->AddKernel( + NewModeKernel(boolean(), ModeExecutor::Exec))); + for (const auto& type : NumericTypes()) { + DCHECK_OK(func->AddKernel( + NewModeKernel(type, GenerateNumeric(*type)))); + } + // Type parameters are ignored + DCHECK_OK(func->AddKernel( + NewModeKernel(decimal128(1, 0), ModeExecutor::Exec))); + DCHECK_OK(func->AddKernel( + NewModeKernel(decimal256(1, 0), ModeExecutor::Exec))); DCHECK_OK(registry->AddFunction(std::move(func))); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index 62e375e695087..cd2410cc9eb75 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q, return datapoint_index; } +template +double DataPointToDouble(T value, const DataType&) { + return static_cast(value); +} +double DataPointToDouble(const Decimal128& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} +double DataPointToDouble(const Decimal256& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} + // copy and nth_element approach, large memory footprint template struct SortQuantiler { - using CType = typename InType::c_type; + using CType = typename TypeTraits::CType; using Allocator = arrow::stl::allocator; Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { @@ -106,8 +117,7 @@ struct SortQuantiler { // prepare out array // out type depends on options const bool is_datapoint = IsDataPoint(options); - const std::shared_ptr out_type = - is_datapoint ? TypeTraits::type_singleton() : float64(); + const std::shared_ptr out_type = is_datapoint ? datum.type() : float64(); int64_t out_length = options.q.size(); if (in_buffer.empty()) { return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); @@ -142,8 +152,9 @@ struct SortQuantiler { double* out_buffer = out_data->template GetMutableValues(1); for (int64_t i = 0; i < out_length; ++i) { const int64_t q_index = q_indices[i]; - out_buffer[q_index] = GetQuantileByInterp( - in_buffer, &last_index, options.q[q_index], options.interpolation); + out_buffer[q_index] = + GetQuantileByInterp(in_buffer, &last_index, options.q[q_index], + options.interpolation, *datum.type()); } } } @@ -170,8 +181,8 @@ struct SortQuantiler { // return quantile interpolated from adjacent input data points double GetQuantileByInterp(std::vector& in, uint64_t* last_index, - double q, - enum QuantileOptions::Interpolation interpolation) { + double q, enum QuantileOptions::Interpolation interpolation, + const DataType& in_type) { const double index = (in.size() - 1) * q; const uint64_t lower_index = static_cast(index); const double fraction = index - lower_index; @@ -181,7 +192,7 @@ struct SortQuantiler { std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index); } - const double lower_value = static_cast(in[lower_index]); + const double lower_value = DataPointToDouble(in[lower_index], in_type); if (fraction == 0) { *last_index = lower_index; return lower_value; @@ -197,7 +208,7 @@ struct SortQuantiler { } *last_index = lower_index; - const double higher_value = static_cast(in[higher_index]); + const double higher_value = DataPointToDouble(in[higher_index], in_type); if (interpolation == QuantileOptions::LINEAR) { // more stable than naive linear interpolation @@ -399,10 +410,15 @@ struct ExactQuantiler::value>> { SortQuantiler impl; }; +template +struct ExactQuantiler::value>> { + SortQuantiler impl; +}; + template Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, const Scalar& scalar, Datum* out) { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; ArrayData* output = out->mutable_array(); output->length = options.q.size(); auto out_type = IsDataPoint(options) ? scalar.type : float64(); @@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, } else { double* out_buffer = output->template GetMutableValues(1); for (int64_t i = 0; i < output->length; i++) { - out_buffer[i] = static_cast(UnboxScalar::Unbox(scalar)); + out_buffer[i] = DataPointToDouble(UnboxScalar::Unbox(scalar), *scalar.type); } } return Status::OK(); @@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) { base.exec = GenerateNumeric(*ty); DCHECK_OK(func->AddKernel(base)); } + { + base.signature = + KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput)); + base.exec = QuantileExecutor::Exec; + DCHECK_OK(func->AddKernel(base)); + } + { + base.signature = + KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput)); + base.exec = QuantileExecutor::Exec; + DCHECK_OK(func->AddKernel(base)); + } } const FunctionDoc quantile_doc{ diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index 0fddf38f575c9..7c86267d94006 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -34,13 +34,25 @@ template struct TDigestImpl : public ScalarAggregator { using ThisType = TDigestImpl; using ArrayType = typename TypeTraits::ArrayType; - using CType = typename ArrowType::c_type; + using CType = typename TypeTraits::CType; - explicit TDigestImpl(const TDigestOptions& options) + TDigestImpl(const TDigestOptions& options, const DataType& in_type) : options{options}, tdigest{options.delta, options.buffer_size}, count{0}, - all_valid{true} {} + decimal_scale{0}, + all_valid{true} { + if (is_decimal_type::value) { + decimal_scale = checked_cast(in_type).scale(); + } + } + + template + double ToDouble(T value) const { + return static_cast(value); + } + double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); } + double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); } Status Consume(KernelContext*, const ExecBatch& batch) override { if (!this->all_valid) return Status::OK(); @@ -57,7 +69,7 @@ struct TDigestImpl : public ScalarAggregator { VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { - this->tdigest.NanAdd(values[pos + i]); + this->tdigest.NanAdd(ToDouble(values[pos + i])); } }); } @@ -66,7 +78,7 @@ struct TDigestImpl : public ScalarAggregator { if (batch[0].scalar()->is_valid) { this->count += 1; for (int64_t i = 0; i < batch.length; i++) { - this->tdigest.NanAdd(value); + this->tdigest.NanAdd(ToDouble(value)); } } } @@ -110,6 +122,7 @@ struct TDigestImpl : public ScalarAggregator { const TDigestOptions options; TDigest tdigest; int64_t count; + int32_t decimal_scale; bool all_valid; }; @@ -132,8 +145,14 @@ struct TDigestInitState { } template - enable_if_t::value, Status> Visit(const Type&) { - state.reset(new TDigestImpl(options)); + enable_if_number Visit(const Type&) { + state.reset(new TDigestImpl(options, in_type)); + return Status::OK(); + } + + template + enable_if_decimal Visit(const Type&) { + state.reset(new TDigestImpl(options, in_type)); return Status::OK(); } @@ -154,7 +173,7 @@ void AddTDigestKernels(KernelInit init, const std::vector>& types, ScalarAggregateFunction* func) { for (const auto& ty : types) { - auto sig = KernelSignature::Make({InputType(ty)}, float64()); + auto sig = KernelSignature::Make({InputType(ty->id())}, float64()); AddAggKernel(std::move(sig), init, func); } } @@ -179,6 +198,7 @@ std::shared_ptr AddTDigestAggKernels() { auto func = std::make_shared( "tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options); AddTDigestKernels(TDigestInit, NumericTypes(), func.get()); + AddTDigestKernels(TDigestInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index c5355a8f4521f..c8b13862ae361 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -2334,6 +2334,26 @@ TEST(TestIndexKernel, Errors) { // Mode // +template +void CheckModes(const Datum& array, const ModeOptions options, + const std::vector& expected_modes, + const std::vector& expected_counts) { + ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); + ValidateOutput(out); + const StructArray out_array(out.array()); + ASSERT_EQ(out_array.length(), expected_modes.size()); + ASSERT_EQ(out_array.num_fields(), 2); + + const CType* out_modes = out_array.field(0)->data()->GetValues(1); + const int64_t* out_counts = out_array.field(1)->data()->GetValues(1); + for (int i = 0; i < out_array.length(); ++i) { + // equal or nan equal + ASSERT_TRUE((expected_modes[i] == out_modes[i]) || + (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i])); + ASSERT_EQ(expected_counts[i], out_counts[i]); + } +} + template class TestPrimitiveModeKernel : public ::testing::Test { public: @@ -2344,21 +2364,7 @@ class TestPrimitiveModeKernel : public ::testing::Test { void AssertModesAre(const Datum& array, const ModeOptions options, const std::vector& expected_modes, const std::vector& expected_counts) { - ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); - ValidateOutput(out); - const StructArray out_array(out.array()); - ASSERT_EQ(out_array.length(), expected_modes.size()); - ASSERT_EQ(out_array.num_fields(), 2); - - const CType* out_modes = out_array.field(0)->data()->GetValues(1); - const int64_t* out_counts = out_array.field(1)->data()->GetValues(1); - for (int i = 0; i < out_array.length(); ++i) { - // equal or nan equal - ASSERT_TRUE( - (expected_modes[i] == out_modes[i]) || - (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i])); - ASSERT_EQ(expected_counts[i], out_counts[i]); - } + CheckModes(array, options, expected_modes, expected_counts); } void AssertModesAre(const std::string& json, const int n, @@ -2587,6 +2593,89 @@ TYPED_TEST(TestFloatingModeKernel, Floats) { this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1)); } +template +class TestDecimalModeKernel : public ::testing::Test { + public: + using CType = typename TypeTraits::CType; + + void AssertModesAre(const Datum& array, const ModeOptions options, + const std::vector& expected_modes, + const std::vector& expected_counts) { + CheckModes(array, options, values(expected_modes), expected_counts); + } + + CType value(const std::string& s) const { + EXPECT_OK_AND_ASSIGN(auto out, CType::FromString(s)); + return out; + } + + std::vector values(const std::vector& strings) const { + std::vector values; + for (const auto& s : strings) { + values.push_back(value(s)); + } + return values; + } + + std::shared_ptr type_instance() { return std::make_shared(4, 2); } +}; + +TYPED_TEST_SUITE(TestDecimalModeKernel, DecimalArrowTypes); + +TYPED_TEST(TestDecimalModeKernel, Decimals) { + auto ty = this->type_instance(); + this->AssertModesAre(ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01"])"), + ModeOptions(1), {"5.01"}, {3}); + this->AssertModesAre( + ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"), + ModeOptions(1), {"-1.42"}, {3}); + this->AssertModesAre( + ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"), + ModeOptions(2), {"-1.42", "5.01"}, {3, 3}); + + this->AssertModesAre(ArrayFromJSON(ty, "[]"), ModeOptions(1), {}, {}); + + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1), {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), + {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {}, + {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00"])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), + {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {}, + {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00"])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {}, + {}); + + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {"0.00"}, {1}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("5.00")"), ModeOptions(/*n=*/1), {"5.00"}, + {1}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), ModeOptions(/*n=*/1), {}, {}); +} + TEST_F(TestInt8ModeKernelValueRange, Basics) { this->AssertModeIs("[0, 127, -128, -128]", -128, 2); this->AssertModeIs("[127, 127, 127]", 127, 3); @@ -2689,6 +2778,24 @@ TEST_F(TestInt32ModeKernel, Sliced) { // Variance/Stddev // +void CheckVarStd(const Datum& array, const VarianceOptions& options, + double expected_var) { + ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); + ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); + auto var = checked_cast(out_var.scalar().get()); + auto std = checked_cast(out_std.scalar().get()); + ASSERT_TRUE(var->is_valid && std->is_valid); + // Near zero these macros don't work as well + // (and MinGW can give results slightly off from zero) + if (std::abs(expected_var) < 1e-20) { + ASSERT_NEAR(std->value * std->value, var->value, 1e-20); + ASSERT_NEAR(var->value, expected_var, 1e-20); + } else { + ASSERT_DOUBLE_EQ(std->value * std->value, var->value); + ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP + } +} + template class TestPrimitiveVarStdKernel : public ::testing::Test { public: @@ -2697,12 +2804,12 @@ class TestPrimitiveVarStdKernel : public ::testing::Test { void AssertVarStdIs(const Array& array, const VarianceOptions& options, double expected_var) { - AssertVarStdIsInternal(array, options, expected_var); + CheckVarStd(array, options, expected_var); } void AssertVarStdIs(const std::shared_ptr& array, const VarianceOptions& options, double expected_var) { - AssertVarStdIsInternal(array, options, expected_var); + CheckVarStd(array, options, expected_var); } void AssertVarStdIs(const std::string& json, const VarianceOptions& options, @@ -2740,17 +2847,6 @@ class TestPrimitiveVarStdKernel : public ::testing::Test { std::shared_ptr type_singleton() { return Traits::type_singleton(); } private: - void AssertVarStdIsInternal(const Datum& array, const VarianceOptions& options, - double expected_var) { - ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); - ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); - auto var = checked_cast(out_var.scalar().get()); - auto std = checked_cast(out_std.scalar().get()); - ASSERT_TRUE(var->is_valid && std->is_valid); - ASSERT_DOUBLE_EQ(std->value * std->value, var->value); - ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP - } - void AssertVarStdIsInvalidInternal(const Datum& array, const VarianceOptions& options) { ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); @@ -3000,6 +3096,18 @@ TEST_F(TestVarStdKernelIntegerLength, Basics) { } #endif +TEST(TestVarStdKernel, Decimal) { + // Effectively treated as double, sanity check results here + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + CheckVarStd(ArrayFromJSON(ty, R"(["1.00"])"), VarianceOptions(), 0); + CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00", "3.00"])"), VarianceOptions(), + 0.6666666666666666); + CheckVarStd(ScalarFromJSON(ty, R"("1.00")"), VarianceOptions(), 0); + CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00"])"), + VarianceOptions(/*ddof=*/1), 0.5); + } +} + // // Quantile // @@ -3541,6 +3649,24 @@ TEST(TestQuantileKernel, AllNullsOrNaNs) { } } +TEST(TestQuantileKernel, Decimal) { + auto check = [](const std::shared_ptr& input, QuantileOptions options, + const std::shared_ptr& expected) { + ASSERT_OK_AND_ASSIGN(Datum out, Quantile(input, options)); + auto out_array = out.make_array(); + ValidateOutput(*out_array); + AssertArraysEqual(*expected, *out_array, /*verbose=*/true); + }; + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + check(ArrayFromJSON(ty, R"(["1.00", "5.00", null])"), + QuantileOptions(0.5, QuantileOptions::LINEAR), + ArrayFromJSON(float64(), R"([3.00])")); + check(ArrayFromJSON(ty, R"(["1.00", "2.00", "5.00"])"), + QuantileOptions(0.5, QuantileOptions::NEAREST), + ArrayFromJSON(ty, R"(["2.00"])")); + } +} + TEST(TestQuantileKernel, Scalar) { for (const auto& ty : {float64(), int64(), uint64()}) { QuantileOptions options(std::vector{0.0, 0.5, 1.0}); @@ -3608,6 +3734,17 @@ TEST(TestTDigestKernel, AllNullsOrNaNs) { } } +TEST(TestTDigestKernel, Decimal) { + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + ASSERT_OK_AND_ASSIGN(auto decimal_array, + TDigest(ArrayFromJSON(ty, R"(["1.00", "2.00", "3.25"])"))); + ASSERT_OK_AND_ASSIGN(auto float_array, + TDigest(ArrayFromJSON(float64(), "[1, 2, 3.25]"))); + AssertArraysApproxEqual(*float_array.make_array(), *decimal_array.make_array(), + /*verbose=*/true); + } +} + TEST(TestTDigestKernel, Scalar) { for (const auto& ty : {float64(), int64(), uint64()}) { TDigestOptions options(std::vector{0.0, 0.5, 1.0}); diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index d0d3c514fae2e..feb98718aee3c 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -36,12 +36,21 @@ using arrow::internal::VisitSetBitRunsVoid; template struct VarStdState { using ArrayType = typename TypeTraits::ArrayType; - using CType = typename ArrowType::c_type; + using CType = typename TypeTraits::CType; using ThisType = VarStdState; - explicit VarStdState(VarianceOptions options) : options(options) {} + explicit VarStdState(int32_t decimal_scale, VarianceOptions options) + : decimal_scale(decimal_scale), options(options) {} - // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm` + template + double ToDouble(T value) const { + return static_cast(value); + } + double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); } + double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); } + + // float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with `two pass + // algorithm` // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm template enable_if_t::value || (sizeof(CType) > 4)> Consume( @@ -52,14 +61,13 @@ struct VarStdState { return; } - using SumType = - typename std::conditional::value, double, int128_t>::type; - SumType sum = SumArray(*array.data()); + using SumType = typename internal::GetSumType::SumType; + SumType sum = internal::SumArray(*array.data()); - const double mean = static_cast(sum) / count; - const double m2 = - SumArray(*array.data(), [mean](CType value) { - const double v = static_cast(value); + const double mean = ToDouble(sum) / count; + const double m2 = internal::SumArray( + *array.data(), [this, mean](CType value) { + const double v = ToDouble(value); return (v - mean) * (v - mean); }); @@ -102,7 +110,7 @@ struct VarStdState { }); // merge variance - ThisType state(options); + ThisType state(decimal_scale, options); state.count = var_std.count; state.mean = var_std.mean(); state.m2 = var_std.m2(); @@ -116,7 +124,7 @@ struct VarStdState { this->m2 = 0; if (scalar.is_valid) { this->count = count; - this->mean = static_cast(UnboxScalar::Unbox(scalar)); + this->mean = ToDouble(UnboxScalar::Unbox(scalar)); } else { this->count = 0; this->mean = 0; @@ -141,6 +149,7 @@ struct VarStdState { &this->mean, &this->m2); } + const int32_t decimal_scale; const VarianceOptions options; int64_t count = 0; double mean = 0; @@ -153,9 +162,9 @@ struct VarStdImpl : public ScalarAggregator { using ThisType = VarStdImpl; using ArrayType = typename TypeTraits::ArrayType; - explicit VarStdImpl(const std::shared_ptr& out_type, + explicit VarStdImpl(int32_t decimal_scale, const std::shared_ptr& out_type, const VarianceOptions& options, VarOrStd return_type) - : out_type(out_type), state(options), return_type(return_type) {} + : out_type(out_type), state(decimal_scale, options), return_type(return_type) {} Status Consume(KernelContext*, const ExecBatch& batch) override { if (batch[0].is_array()) { @@ -216,8 +225,16 @@ struct VarStdInitState { } template - enable_if_t::value, Status> Visit(const Type&) { - state.reset(new VarStdImpl(out_type, options, return_type)); + enable_if_number Visit(const Type&) { + state.reset( + new VarStdImpl(/*decimal_scale=*/0, out_type, options, return_type)); + return Status::OK(); + } + + template + enable_if_decimal Visit(const Type&) { + state.reset(new VarStdImpl(checked_cast(in_type).scale(), + out_type, options, return_type)); return Status::OK(); } @@ -247,7 +264,7 @@ void AddVarStdKernels(KernelInit init, const std::vector>& types, ScalarAggregateFunction* func) { for (const auto& ty : types) { - auto sig = KernelSignature::Make({InputType(ty)}, float64()); + auto sig = KernelSignature::Make({InputType(ty->id())}, float64()); AddAggKernel(std::move(sig), init, func); } } @@ -275,6 +292,7 @@ std::shared_ptr AddStddevAggKernels() { auto func = std::make_shared( "stddev", Arity::Unary(), &stddev_doc, &default_std_options); AddVarStdKernels(StddevInit, NumericTypes(), func.get()); + AddVarStdKernels(StddevInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } @@ -283,6 +301,7 @@ std::shared_ptr AddVarianceAggKernels() { auto func = std::make_shared( "variance", Arity::Unary(), &variance_doc, &default_var_options); AddVarStdKernels(VarianceInit, NumericTypes(), func.get()); + AddVarStdKernels(VarianceInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 73c8f9d26c0e0..9f53267535511 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -519,7 +519,8 @@ struct GrouperFastImpl : Grouper { /// Implementations should be default constructible and perform initialization in /// Init(). struct GroupedAggregator : KernelState { - virtual Status Init(ExecContext*, const FunctionOptions*) = 0; + virtual Status Init(ExecContext*, const std::vector& inputs, + const FunctionOptions*) = 0; virtual Status Resize(int64_t new_num_groups) = 0; @@ -536,7 +537,7 @@ template Result> HashAggregateInit(KernelContext* ctx, const KernelInitArgs& args) { auto impl = ::arrow::internal::make_unique(); - RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options)); + RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.inputs, args.options)); return std::move(impl); } @@ -636,7 +637,8 @@ void VisitGroupedValuesNonNull(const ExecBatch& batch, ConsumeValue&& valid_func // Count implementation struct GroupedCountImpl : public GroupedAggregator { - Status Init(ExecContext* ctx, const FunctionOptions* options) override { + Status Init(ExecContext* ctx, const std::vector&, + const FunctionOptions* options) override { options_ = checked_cast(*options); counts_ = BufferBuilder(ctx->memory_pool()); return Status::OK(); @@ -725,13 +727,14 @@ struct GroupedReducingAggregator : public GroupedAggregator { using CType = typename TypeTraits::CType; using InputCType = typename TypeTraits::CType; - Status Init(ExecContext* ctx, const FunctionOptions* options) override { + Status Init(ExecContext* ctx, const std::vector& inputs, + const FunctionOptions* options) override { pool_ = ctx->memory_pool(); options_ = checked_cast(*options); reduced_ = TypedBufferBuilder(pool_); counts_ = TypedBufferBuilder(pool_); no_nulls_ = TypedBufferBuilder(pool_); - // out_type_ initialized by SumInit + out_type_ = GetOutType(inputs[0].type); return Status::OK(); } @@ -829,6 +832,18 @@ struct GroupedReducingAggregator : public GroupedAggregator { std::shared_ptr out_type() const override { return out_type_; } + template + static enable_if_t::value, std::shared_ptr> GetOutType( + const std::shared_ptr& in_type) { + return TypeTraits::type_singleton(); + } + + template + static enable_if_decimal> GetOutType( + const std::shared_ptr& in_type) { + return in_type; + } + int64_t num_groups_ = 0; ScalarAggregateOptions options_; TypedBufferBuilder reduced_; @@ -838,76 +853,35 @@ struct GroupedReducingAggregator : public GroupedAggregator { MemoryPool* pool_; }; -// ---------------------------------------------------------------------- -// Sum implementation - -template -struct GroupedSumImpl : public GroupedReducingAggregator> { - using Base = GroupedReducingAggregator>; - using CType = typename Base::CType; - using InputCType = typename Base::InputCType; - - // Default value for a group - static CType NullValue(const DataType&) { return CType(0); } - - template - static enable_if_number Reduce(const DataType&, const CType u, - const InputCType v) { - return static_cast(to_unsigned(u) + to_unsigned(static_cast(v))); - } - - static CType Reduce(const DataType&, const CType u, const CType v) { - return static_cast(to_unsigned(u) + to_unsigned(v)); - } - - using Base::Finish; -}; - -template