diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 0bc839be8e3cc..53fa2d2d7f702 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func, auto init = [min_max_func]( KernelContext* ctx, const KernelInitArgs& args) -> Result> { - std::vector inputs = args.inputs; - ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs)); - KernelInitArgs new_args{kernel, inputs, args.options}; + ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs)); + KernelInitArgs new_args{kernel, args.inputs, args.options}; return kernel->init(ctx, new_args); }; diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h index 22a54558f4e8a..946ec01900c5b 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h @@ -21,6 +21,7 @@ #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/bit_run_reader.h" +#include "arrow/util/int128_internal.h" #include "arrow/util/logging.h" namespace arrow { @@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFinalize finalize, ScalarAggregateFunction* func, SimdLevel::type simd_level = SimdLevel::NONE); +using arrow::internal::VisitSetBitRunsVoid; + +template +struct GetSumType; + +template +struct GetSumType> { + using SumType = double; +}; + +template +struct GetSumType> { + using SumType = arrow::internal::int128_t; +}; + +template +struct GetSumType> { + using SumType = typename TypeTraits::CType; +}; + // SumArray must be parameterized with the SIMD level since it's called both from // translation units with and without vectorization. Normally it gets inlined but // if not, without the parameter, we'll have multiple definitions of the same diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index f225f6bf569c3..6a50556a13efd 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count"; constexpr uint64_t kCountEOF = ~0ULL; -template +template ::CType> Result> PrepareOutput(int64_t n, KernelContext* ctx, Datum* out) { - const auto& mode_type = TypeTraits::type_singleton(); + DCHECK_EQ(Type::STRUCT, out->type()->id()); + const auto& out_type = checked_cast(*out->type()); + DCHECK_EQ(2, out_type.num_fields()); + const auto& mode_type = out_type.field(0)->type(); const auto& count_type = int64(); auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0); @@ -61,10 +64,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx, count_buffer = count_data->template GetMutableValues(1); } - const auto& out_type = - struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)}); - *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0)); - + *out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0)); return std::make_pair(mode_buffer, count_buffer); } @@ -72,7 +72,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx, // suboptimal for tiny or large n, possibly okay as we're not in hot path template Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) { - using CType = typename InType::c_type; + using CType = typename TypeTraits::CType; using ValueCountPair = std::pair; auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) { @@ -203,13 +203,25 @@ struct CountModer { } }; -// copy and sort approach for floating points or integers with wide value range +// copy and sort approach for floating points, decimals, or integers with wide +// value range // O(n) space, O(nlogn) time template struct SortModer { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; using Allocator = arrow::stl::allocator; + template + static enable_if_floating_point GetNan() { + return static_cast(NAN); + } + + template + static enable_if_t::value, CType> GetNan() { + DCHECK(false); + return static_cast(0); + } + Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const Datum& datum = batch[0]; const int64_t in_length = datum.length() - datum.null_count(); @@ -246,7 +258,7 @@ struct SortModer { if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) { // handle NAN at last if (nan_count > 0) { - auto value_count = std::make_pair(static_cast(NAN), nan_count); + auto value_count = std::make_pair(GetNan(), nan_count); nan_count = 0; return value_count; } @@ -318,13 +330,18 @@ struct Moder::value && }; template -struct Moder::value>> { +struct Moder> { + SortModer impl; +}; + +template +struct Moder> { SortModer impl; }; template Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; const ModeOptions& options = ModeState::Get(ctx); if ((!options.skip_nulls && !scalar.is_valid) || @@ -366,30 +383,33 @@ struct ModeExecutor { } }; -VectorKernel NewModeKernel(const std::shared_ptr& in_type) { +Result ModeType(KernelContext*, const std::vector& descrs) { + return ValueDescr::Array( + struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())})); +} + +VectorKernel NewModeKernel(const std::shared_ptr& in_type, + ArrayKernelExec exec) { VectorKernel kernel; kernel.init = ModeState::Init; kernel.can_execute_chunkwise = false; kernel.output_chunked = false; - auto out_type = - struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())}); - kernel.signature = - KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type)); - return kernel; -} - -void AddBooleanModeKernel(VectorFunction* func) { - VectorKernel kernel = NewModeKernel(boolean()); - kernel.exec = ModeExecutor::Exec; - DCHECK_OK(func->AddKernel(kernel)); -} - -void AddNumericModeKernels(VectorFunction* func) { - for (const auto& type : NumericTypes()) { - VectorKernel kernel = NewModeKernel(type); - kernel.exec = GenerateNumeric(*type); - DCHECK_OK(func->AddKernel(kernel)); + switch (in_type->id()) { + case Type::DECIMAL128: + case Type::DECIMAL256: + kernel.signature = + KernelSignature::Make({InputType(in_type->id())}, OutputType(ModeType)); + break; + default: { + auto out_type = + struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())}); + kernel.signature = KernelSignature::Make({InputType(in_type->id())}, + ValueDescr::Array(std::move(out_type))); + break; + } } + kernel.exec = std::move(exec); + return kernel; } const FunctionDoc mode_doc{ @@ -409,8 +429,17 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) { static auto default_options = ModeOptions::Defaults(); auto func = std::make_shared("mode", Arity::Unary(), &mode_doc, &default_options); - AddBooleanModeKernel(func.get()); - AddNumericModeKernels(func.get()); + DCHECK_OK(func->AddKernel( + NewModeKernel(boolean(), ModeExecutor::Exec))); + for (const auto& type : NumericTypes()) { + DCHECK_OK(func->AddKernel( + NewModeKernel(type, GenerateNumeric(*type)))); + } + // Type parameters are ignored + DCHECK_OK(func->AddKernel( + NewModeKernel(decimal128(1, 0), ModeExecutor::Exec))); + DCHECK_OK(func->AddKernel( + NewModeKernel(decimal256(1, 0), ModeExecutor::Exec))); DCHECK_OK(registry->AddFunction(std::move(func))); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index 62e375e695087..cd2410cc9eb75 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q, return datapoint_index; } +template +double DataPointToDouble(T value, const DataType&) { + return static_cast(value); +} +double DataPointToDouble(const Decimal128& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} +double DataPointToDouble(const Decimal256& value, const DataType& ty) { + return value.ToDouble(checked_cast(ty).scale()); +} + // copy and nth_element approach, large memory footprint template struct SortQuantiler { - using CType = typename InType::c_type; + using CType = typename TypeTraits::CType; using Allocator = arrow::stl::allocator; Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { @@ -106,8 +117,7 @@ struct SortQuantiler { // prepare out array // out type depends on options const bool is_datapoint = IsDataPoint(options); - const std::shared_ptr out_type = - is_datapoint ? TypeTraits::type_singleton() : float64(); + const std::shared_ptr out_type = is_datapoint ? datum.type() : float64(); int64_t out_length = options.q.size(); if (in_buffer.empty()) { return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); @@ -142,8 +152,9 @@ struct SortQuantiler { double* out_buffer = out_data->template GetMutableValues(1); for (int64_t i = 0; i < out_length; ++i) { const int64_t q_index = q_indices[i]; - out_buffer[q_index] = GetQuantileByInterp( - in_buffer, &last_index, options.q[q_index], options.interpolation); + out_buffer[q_index] = + GetQuantileByInterp(in_buffer, &last_index, options.q[q_index], + options.interpolation, *datum.type()); } } } @@ -170,8 +181,8 @@ struct SortQuantiler { // return quantile interpolated from adjacent input data points double GetQuantileByInterp(std::vector& in, uint64_t* last_index, - double q, - enum QuantileOptions::Interpolation interpolation) { + double q, enum QuantileOptions::Interpolation interpolation, + const DataType& in_type) { const double index = (in.size() - 1) * q; const uint64_t lower_index = static_cast(index); const double fraction = index - lower_index; @@ -181,7 +192,7 @@ struct SortQuantiler { std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index); } - const double lower_value = static_cast(in[lower_index]); + const double lower_value = DataPointToDouble(in[lower_index], in_type); if (fraction == 0) { *last_index = lower_index; return lower_value; @@ -197,7 +208,7 @@ struct SortQuantiler { } *last_index = lower_index; - const double higher_value = static_cast(in[higher_index]); + const double higher_value = DataPointToDouble(in[higher_index], in_type); if (interpolation == QuantileOptions::LINEAR) { // more stable than naive linear interpolation @@ -399,10 +410,15 @@ struct ExactQuantiler::value>> { SortQuantiler impl; }; +template +struct ExactQuantiler::value>> { + SortQuantiler impl; +}; + template Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, const Scalar& scalar, Datum* out) { - using CType = typename T::c_type; + using CType = typename TypeTraits::CType; ArrayData* output = out->mutable_array(); output->length = options.q.size(); auto out_type = IsDataPoint(options) ? scalar.type : float64(); @@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, } else { double* out_buffer = output->template GetMutableValues(1); for (int64_t i = 0; i < output->length; i++) { - out_buffer[i] = static_cast(UnboxScalar::Unbox(scalar)); + out_buffer[i] = DataPointToDouble(UnboxScalar::Unbox(scalar), *scalar.type); } } return Status::OK(); @@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) { base.exec = GenerateNumeric(*ty); DCHECK_OK(func->AddKernel(base)); } + { + base.signature = + KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput)); + base.exec = QuantileExecutor::Exec; + DCHECK_OK(func->AddKernel(base)); + } + { + base.signature = + KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput)); + base.exec = QuantileExecutor::Exec; + DCHECK_OK(func->AddKernel(base)); + } } const FunctionDoc quantile_doc{ diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index 0fddf38f575c9..7c86267d94006 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -34,13 +34,25 @@ template struct TDigestImpl : public ScalarAggregator { using ThisType = TDigestImpl; using ArrayType = typename TypeTraits::ArrayType; - using CType = typename ArrowType::c_type; + using CType = typename TypeTraits::CType; - explicit TDigestImpl(const TDigestOptions& options) + TDigestImpl(const TDigestOptions& options, const DataType& in_type) : options{options}, tdigest{options.delta, options.buffer_size}, count{0}, - all_valid{true} {} + decimal_scale{0}, + all_valid{true} { + if (is_decimal_type::value) { + decimal_scale = checked_cast(in_type).scale(); + } + } + + template + double ToDouble(T value) const { + return static_cast(value); + } + double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); } + double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); } Status Consume(KernelContext*, const ExecBatch& batch) override { if (!this->all_valid) return Status::OK(); @@ -57,7 +69,7 @@ struct TDigestImpl : public ScalarAggregator { VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { - this->tdigest.NanAdd(values[pos + i]); + this->tdigest.NanAdd(ToDouble(values[pos + i])); } }); } @@ -66,7 +78,7 @@ struct TDigestImpl : public ScalarAggregator { if (batch[0].scalar()->is_valid) { this->count += 1; for (int64_t i = 0; i < batch.length; i++) { - this->tdigest.NanAdd(value); + this->tdigest.NanAdd(ToDouble(value)); } } } @@ -110,6 +122,7 @@ struct TDigestImpl : public ScalarAggregator { const TDigestOptions options; TDigest tdigest; int64_t count; + int32_t decimal_scale; bool all_valid; }; @@ -132,8 +145,14 @@ struct TDigestInitState { } template - enable_if_t::value, Status> Visit(const Type&) { - state.reset(new TDigestImpl(options)); + enable_if_number Visit(const Type&) { + state.reset(new TDigestImpl(options, in_type)); + return Status::OK(); + } + + template + enable_if_decimal Visit(const Type&) { + state.reset(new TDigestImpl(options, in_type)); return Status::OK(); } @@ -154,7 +173,7 @@ void AddTDigestKernels(KernelInit init, const std::vector>& types, ScalarAggregateFunction* func) { for (const auto& ty : types) { - auto sig = KernelSignature::Make({InputType(ty)}, float64()); + auto sig = KernelSignature::Make({InputType(ty->id())}, float64()); AddAggKernel(std::move(sig), init, func); } } @@ -179,6 +198,7 @@ std::shared_ptr AddTDigestAggKernels() { auto func = std::make_shared( "tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options); AddTDigestKernels(TDigestInit, NumericTypes(), func.get()); + AddTDigestKernels(TDigestInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index c5355a8f4521f..c8b13862ae361 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -2334,6 +2334,26 @@ TEST(TestIndexKernel, Errors) { // Mode // +template +void CheckModes(const Datum& array, const ModeOptions options, + const std::vector& expected_modes, + const std::vector& expected_counts) { + ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); + ValidateOutput(out); + const StructArray out_array(out.array()); + ASSERT_EQ(out_array.length(), expected_modes.size()); + ASSERT_EQ(out_array.num_fields(), 2); + + const CType* out_modes = out_array.field(0)->data()->GetValues(1); + const int64_t* out_counts = out_array.field(1)->data()->GetValues(1); + for (int i = 0; i < out_array.length(); ++i) { + // equal or nan equal + ASSERT_TRUE((expected_modes[i] == out_modes[i]) || + (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i])); + ASSERT_EQ(expected_counts[i], out_counts[i]); + } +} + template class TestPrimitiveModeKernel : public ::testing::Test { public: @@ -2344,21 +2364,7 @@ class TestPrimitiveModeKernel : public ::testing::Test { void AssertModesAre(const Datum& array, const ModeOptions options, const std::vector& expected_modes, const std::vector& expected_counts) { - ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options)); - ValidateOutput(out); - const StructArray out_array(out.array()); - ASSERT_EQ(out_array.length(), expected_modes.size()); - ASSERT_EQ(out_array.num_fields(), 2); - - const CType* out_modes = out_array.field(0)->data()->GetValues(1); - const int64_t* out_counts = out_array.field(1)->data()->GetValues(1); - for (int i = 0; i < out_array.length(); ++i) { - // equal or nan equal - ASSERT_TRUE( - (expected_modes[i] == out_modes[i]) || - (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i])); - ASSERT_EQ(expected_counts[i], out_counts[i]); - } + CheckModes(array, options, expected_modes, expected_counts); } void AssertModesAre(const std::string& json, const int n, @@ -2587,6 +2593,89 @@ TYPED_TEST(TestFloatingModeKernel, Floats) { this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1)); } +template +class TestDecimalModeKernel : public ::testing::Test { + public: + using CType = typename TypeTraits::CType; + + void AssertModesAre(const Datum& array, const ModeOptions options, + const std::vector& expected_modes, + const std::vector& expected_counts) { + CheckModes(array, options, values(expected_modes), expected_counts); + } + + CType value(const std::string& s) const { + EXPECT_OK_AND_ASSIGN(auto out, CType::FromString(s)); + return out; + } + + std::vector values(const std::vector& strings) const { + std::vector values; + for (const auto& s : strings) { + values.push_back(value(s)); + } + return values; + } + + std::shared_ptr type_instance() { return std::make_shared(4, 2); } +}; + +TYPED_TEST_SUITE(TestDecimalModeKernel, DecimalArrowTypes); + +TYPED_TEST(TestDecimalModeKernel, Decimals) { + auto ty = this->type_instance(); + this->AssertModesAre(ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01"])"), + ModeOptions(1), {"5.01"}, {3}); + this->AssertModesAre( + ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"), + ModeOptions(1), {"-1.42"}, {3}); + this->AssertModesAre( + ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"), + ModeOptions(2), {"-1.42", "5.01"}, {3, 3}); + + this->AssertModesAre(ArrayFromJSON(ty, "[]"), ModeOptions(1), {}, {}); + + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1), {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), + {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {}, + {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00"])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), + {"-2.00"}, {2}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {}, + {}); + this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00"])"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {}, + {}); + + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {"0.00"}, {1}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), + ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {}, + {}); + this->AssertModesAre(ScalarFromJSON(ty, R"("5.00")"), ModeOptions(/*n=*/1), {"5.00"}, + {1}); + this->AssertModesAre(ScalarFromJSON(ty, "null"), ModeOptions(/*n=*/1), {}, {}); +} + TEST_F(TestInt8ModeKernelValueRange, Basics) { this->AssertModeIs("[0, 127, -128, -128]", -128, 2); this->AssertModeIs("[127, 127, 127]", 127, 3); @@ -2689,6 +2778,24 @@ TEST_F(TestInt32ModeKernel, Sliced) { // Variance/Stddev // +void CheckVarStd(const Datum& array, const VarianceOptions& options, + double expected_var) { + ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); + ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); + auto var = checked_cast(out_var.scalar().get()); + auto std = checked_cast(out_std.scalar().get()); + ASSERT_TRUE(var->is_valid && std->is_valid); + // Near zero these macros don't work as well + // (and MinGW can give results slightly off from zero) + if (std::abs(expected_var) < 1e-20) { + ASSERT_NEAR(std->value * std->value, var->value, 1e-20); + ASSERT_NEAR(var->value, expected_var, 1e-20); + } else { + ASSERT_DOUBLE_EQ(std->value * std->value, var->value); + ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP + } +} + template class TestPrimitiveVarStdKernel : public ::testing::Test { public: @@ -2697,12 +2804,12 @@ class TestPrimitiveVarStdKernel : public ::testing::Test { void AssertVarStdIs(const Array& array, const VarianceOptions& options, double expected_var) { - AssertVarStdIsInternal(array, options, expected_var); + CheckVarStd(array, options, expected_var); } void AssertVarStdIs(const std::shared_ptr& array, const VarianceOptions& options, double expected_var) { - AssertVarStdIsInternal(array, options, expected_var); + CheckVarStd(array, options, expected_var); } void AssertVarStdIs(const std::string& json, const VarianceOptions& options, @@ -2740,17 +2847,6 @@ class TestPrimitiveVarStdKernel : public ::testing::Test { std::shared_ptr type_singleton() { return Traits::type_singleton(); } private: - void AssertVarStdIsInternal(const Datum& array, const VarianceOptions& options, - double expected_var) { - ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); - ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); - auto var = checked_cast(out_var.scalar().get()); - auto std = checked_cast(out_std.scalar().get()); - ASSERT_TRUE(var->is_valid && std->is_valid); - ASSERT_DOUBLE_EQ(std->value * std->value, var->value); - ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP - } - void AssertVarStdIsInvalidInternal(const Datum& array, const VarianceOptions& options) { ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options)); ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options)); @@ -3000,6 +3096,18 @@ TEST_F(TestVarStdKernelIntegerLength, Basics) { } #endif +TEST(TestVarStdKernel, Decimal) { + // Effectively treated as double, sanity check results here + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + CheckVarStd(ArrayFromJSON(ty, R"(["1.00"])"), VarianceOptions(), 0); + CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00", "3.00"])"), VarianceOptions(), + 0.6666666666666666); + CheckVarStd(ScalarFromJSON(ty, R"("1.00")"), VarianceOptions(), 0); + CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00"])"), + VarianceOptions(/*ddof=*/1), 0.5); + } +} + // // Quantile // @@ -3541,6 +3649,24 @@ TEST(TestQuantileKernel, AllNullsOrNaNs) { } } +TEST(TestQuantileKernel, Decimal) { + auto check = [](const std::shared_ptr& input, QuantileOptions options, + const std::shared_ptr& expected) { + ASSERT_OK_AND_ASSIGN(Datum out, Quantile(input, options)); + auto out_array = out.make_array(); + ValidateOutput(*out_array); + AssertArraysEqual(*expected, *out_array, /*verbose=*/true); + }; + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + check(ArrayFromJSON(ty, R"(["1.00", "5.00", null])"), + QuantileOptions(0.5, QuantileOptions::LINEAR), + ArrayFromJSON(float64(), R"([3.00])")); + check(ArrayFromJSON(ty, R"(["1.00", "2.00", "5.00"])"), + QuantileOptions(0.5, QuantileOptions::NEAREST), + ArrayFromJSON(ty, R"(["2.00"])")); + } +} + TEST(TestQuantileKernel, Scalar) { for (const auto& ty : {float64(), int64(), uint64()}) { QuantileOptions options(std::vector{0.0, 0.5, 1.0}); @@ -3608,6 +3734,17 @@ TEST(TestTDigestKernel, AllNullsOrNaNs) { } } +TEST(TestTDigestKernel, Decimal) { + for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) { + ASSERT_OK_AND_ASSIGN(auto decimal_array, + TDigest(ArrayFromJSON(ty, R"(["1.00", "2.00", "3.25"])"))); + ASSERT_OK_AND_ASSIGN(auto float_array, + TDigest(ArrayFromJSON(float64(), "[1, 2, 3.25]"))); + AssertArraysApproxEqual(*float_array.make_array(), *decimal_array.make_array(), + /*verbose=*/true); + } +} + TEST(TestTDigestKernel, Scalar) { for (const auto& ty : {float64(), int64(), uint64()}) { TDigestOptions options(std::vector{0.0, 0.5, 1.0}); diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index d0d3c514fae2e..feb98718aee3c 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -36,12 +36,21 @@ using arrow::internal::VisitSetBitRunsVoid; template struct VarStdState { using ArrayType = typename TypeTraits::ArrayType; - using CType = typename ArrowType::c_type; + using CType = typename TypeTraits::CType; using ThisType = VarStdState; - explicit VarStdState(VarianceOptions options) : options(options) {} + explicit VarStdState(int32_t decimal_scale, VarianceOptions options) + : decimal_scale(decimal_scale), options(options) {} - // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm` + template + double ToDouble(T value) const { + return static_cast(value); + } + double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); } + double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); } + + // float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with `two pass + // algorithm` // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm template enable_if_t::value || (sizeof(CType) > 4)> Consume( @@ -52,14 +61,13 @@ struct VarStdState { return; } - using SumType = - typename std::conditional::value, double, int128_t>::type; - SumType sum = SumArray(*array.data()); + using SumType = typename internal::GetSumType::SumType; + SumType sum = internal::SumArray(*array.data()); - const double mean = static_cast(sum) / count; - const double m2 = - SumArray(*array.data(), [mean](CType value) { - const double v = static_cast(value); + const double mean = ToDouble(sum) / count; + const double m2 = internal::SumArray( + *array.data(), [this, mean](CType value) { + const double v = ToDouble(value); return (v - mean) * (v - mean); }); @@ -102,7 +110,7 @@ struct VarStdState { }); // merge variance - ThisType state(options); + ThisType state(decimal_scale, options); state.count = var_std.count; state.mean = var_std.mean(); state.m2 = var_std.m2(); @@ -116,7 +124,7 @@ struct VarStdState { this->m2 = 0; if (scalar.is_valid) { this->count = count; - this->mean = static_cast(UnboxScalar::Unbox(scalar)); + this->mean = ToDouble(UnboxScalar::Unbox(scalar)); } else { this->count = 0; this->mean = 0; @@ -141,6 +149,7 @@ struct VarStdState { &this->mean, &this->m2); } + const int32_t decimal_scale; const VarianceOptions options; int64_t count = 0; double mean = 0; @@ -153,9 +162,9 @@ struct VarStdImpl : public ScalarAggregator { using ThisType = VarStdImpl; using ArrayType = typename TypeTraits::ArrayType; - explicit VarStdImpl(const std::shared_ptr& out_type, + explicit VarStdImpl(int32_t decimal_scale, const std::shared_ptr& out_type, const VarianceOptions& options, VarOrStd return_type) - : out_type(out_type), state(options), return_type(return_type) {} + : out_type(out_type), state(decimal_scale, options), return_type(return_type) {} Status Consume(KernelContext*, const ExecBatch& batch) override { if (batch[0].is_array()) { @@ -216,8 +225,16 @@ struct VarStdInitState { } template - enable_if_t::value, Status> Visit(const Type&) { - state.reset(new VarStdImpl(out_type, options, return_type)); + enable_if_number Visit(const Type&) { + state.reset( + new VarStdImpl(/*decimal_scale=*/0, out_type, options, return_type)); + return Status::OK(); + } + + template + enable_if_decimal Visit(const Type&) { + state.reset(new VarStdImpl(checked_cast(in_type).scale(), + out_type, options, return_type)); return Status::OK(); } @@ -247,7 +264,7 @@ void AddVarStdKernels(KernelInit init, const std::vector>& types, ScalarAggregateFunction* func) { for (const auto& ty : types) { - auto sig = KernelSignature::Make({InputType(ty)}, float64()); + auto sig = KernelSignature::Make({InputType(ty->id())}, float64()); AddAggKernel(std::move(sig), init, func); } } @@ -275,6 +292,7 @@ std::shared_ptr AddStddevAggKernels() { auto func = std::make_shared( "stddev", Arity::Unary(), &stddev_doc, &default_std_options); AddVarStdKernels(StddevInit, NumericTypes(), func.get()); + AddVarStdKernels(StddevInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } @@ -283,6 +301,7 @@ std::shared_ptr AddVarianceAggKernels() { auto func = std::make_shared( "variance", Arity::Unary(), &variance_doc, &default_var_options); AddVarStdKernels(VarianceInit, NumericTypes(), func.get()); + AddVarStdKernels(VarianceInit, {decimal128(1, 1), decimal256(1, 1)}, func.get()); return func; } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 73c8f9d26c0e0..9f53267535511 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -519,7 +519,8 @@ struct GrouperFastImpl : Grouper { /// Implementations should be default constructible and perform initialization in /// Init(). struct GroupedAggregator : KernelState { - virtual Status Init(ExecContext*, const FunctionOptions*) = 0; + virtual Status Init(ExecContext*, const std::vector& inputs, + const FunctionOptions*) = 0; virtual Status Resize(int64_t new_num_groups) = 0; @@ -536,7 +537,7 @@ template Result> HashAggregateInit(KernelContext* ctx, const KernelInitArgs& args) { auto impl = ::arrow::internal::make_unique(); - RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options)); + RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.inputs, args.options)); return std::move(impl); } @@ -636,7 +637,8 @@ void VisitGroupedValuesNonNull(const ExecBatch& batch, ConsumeValue&& valid_func // Count implementation struct GroupedCountImpl : public GroupedAggregator { - Status Init(ExecContext* ctx, const FunctionOptions* options) override { + Status Init(ExecContext* ctx, const std::vector&, + const FunctionOptions* options) override { options_ = checked_cast(*options); counts_ = BufferBuilder(ctx->memory_pool()); return Status::OK(); @@ -725,13 +727,14 @@ struct GroupedReducingAggregator : public GroupedAggregator { using CType = typename TypeTraits::CType; using InputCType = typename TypeTraits::CType; - Status Init(ExecContext* ctx, const FunctionOptions* options) override { + Status Init(ExecContext* ctx, const std::vector& inputs, + const FunctionOptions* options) override { pool_ = ctx->memory_pool(); options_ = checked_cast(*options); reduced_ = TypedBufferBuilder(pool_); counts_ = TypedBufferBuilder(pool_); no_nulls_ = TypedBufferBuilder(pool_); - // out_type_ initialized by SumInit + out_type_ = GetOutType(inputs[0].type); return Status::OK(); } @@ -829,6 +832,18 @@ struct GroupedReducingAggregator : public GroupedAggregator { std::shared_ptr out_type() const override { return out_type_; } + template + static enable_if_t::value, std::shared_ptr> GetOutType( + const std::shared_ptr& in_type) { + return TypeTraits::type_singleton(); + } + + template + static enable_if_decimal> GetOutType( + const std::shared_ptr& in_type) { + return in_type; + } + int64_t num_groups_ = 0; ScalarAggregateOptions options_; TypedBufferBuilder reduced_; @@ -838,76 +853,35 @@ struct GroupedReducingAggregator : public GroupedAggregator { MemoryPool* pool_; }; -// ---------------------------------------------------------------------- -// Sum implementation - -template -struct GroupedSumImpl : public GroupedReducingAggregator> { - using Base = GroupedReducingAggregator>; - using CType = typename Base::CType; - using InputCType = typename Base::InputCType; - - // Default value for a group - static CType NullValue(const DataType&) { return CType(0); } - - template - static enable_if_number Reduce(const DataType&, const CType u, - const InputCType v) { - return static_cast(to_unsigned(u) + to_unsigned(static_cast(v))); - } - - static CType Reduce(const DataType&, const CType u, const CType v) { - return static_cast(to_unsigned(u) + to_unsigned(v)); - } - - using Base::Finish; -}; - -template