From 944d15dbdec8c9b5fb11716603f9c1e73c5b6af7 Mon Sep 17 00:00:00 2001 From: Kevin H Wilson Date: Tue, 17 Sep 2024 14:55:41 -0400 Subject: [PATCH] Increase the precision of decimals during compute This is an initial pass whereby a scalar aggregate of a Decimal type increases its precision to the maximum. That is, a sum of an array of decimal128(3, 2)'s becomes a decimal128(38, 2). Previously, the exact decimal type was preserved (e.g., a sum of decimal128(3, 2)'s was a decimal128(3, 2)) *regardless* of whether that was enough precision to capture the full decimal value. --- .../arrow/compute/kernels/aggregate_basic.cc | 38 ++++++++-- .../compute/kernels/aggregate_basic.inc.cc | 30 +++++++- python/pyarrow/tests/test_compute.py | 74 +++++++++++++++++++ 3 files changed, 130 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index b545d8bcc1003..a47a8cc49e37e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -365,11 +365,20 @@ struct ProductImpl : public ScalarAggregator { } Status Finalize(KernelContext*, Datum* out) override { + std::shared_ptr out_type_; + if (auto decimal128_type = std::dynamic_pointer_cast(this->out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal128Type::Make(Decimal128Type::kMaxPrecision, decimal128_type->scale())); + } else if (auto decimal256_type = std::dynamic_pointer_cast(this->out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal256Type::Make(Decimal256Type::kMaxPrecision, decimal256_type->scale())); + } else { + out_type_ = out_type; + } + if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count)) { - out->value = std::make_shared(out_type); + out->value = std::make_shared(out_type_); } else { - out->value = std::make_shared(this->product, out_type); + out->value = std::make_shared(this->product, out_type_); } return Status::OK(); } @@ -1020,6 +1029,19 @@ const FunctionDoc index_doc{"Find the index of the first occurrence of a given v } // namespace + +Result MaxPrecisionDecimalType(KernelContext*, const std::vector& types) { + std::shared_ptr out_type_; + if (auto decimal128_type = std::dynamic_pointer_cast(types.front().GetSharedPtr())) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal128Type::Make(Decimal128Type::kMaxPrecision, decimal128_type->scale())); + } else if (auto decimal256_type = std::dynamic_pointer_cast(types.front().GetSharedPtr())) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal256Type::Make(Decimal256Type::kMaxPrecision, decimal256_type->scale())); + } else { + return Status::TypeError("Bad call"); + } + return TypeHolder(out_type_); +} + void RegisterScalarAggregateBasic(FunctionRegistry* registry) { static auto default_scalar_aggregate_options = ScalarAggregateOptions::Defaults(); static auto default_count_options = CountOptions::Defaults(); @@ -1048,9 +1070,9 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { func = std::make_shared("sum", Arity::Unary(), sum_doc, &default_scalar_aggregate_options); AddArrayScalarAggKernels(SumInit, {boolean()}, uint64(), func.get()); - AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, FirstType), SumInit, func.get(), + AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, MaxPrecisionDecimalType), SumInit, func.get(), SimdLevel::NONE); - AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, FirstType), SumInit, func.get(), + AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, MaxPrecisionDecimalType), SumInit, func.get(), SimdLevel::NONE); AddArrayScalarAggKernels(SumInit, SignedIntTypes(), int64(), func.get()); AddArrayScalarAggKernels(SumInit, UnsignedIntTypes(), uint64(), func.get()); @@ -1076,9 +1098,9 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { &default_scalar_aggregate_options); AddArrayScalarAggKernels(MeanInit, {boolean()}, float64(), func.get()); AddArrayScalarAggKernels(MeanInit, NumericTypes(), float64(), func.get()); - AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, FirstType), MeanInit, func.get(), + AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, MaxPrecisionDecimalType), MeanInit, func.get(), SimdLevel::NONE); - AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, FirstType), MeanInit, func.get(), + AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, MaxPrecisionDecimalType), MeanInit, func.get(), SimdLevel::NONE); AddArrayScalarAggKernels(MeanInit, {null()}, float64(), func.get()); // Add the SIMD variants for mean @@ -1160,9 +1182,9 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { AddArrayScalarAggKernels(ProductInit::Init, UnsignedIntTypes(), uint64(), func.get()); AddArrayScalarAggKernels(ProductInit::Init, FloatingPointTypes(), float64(), func.get()); - AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, FirstType), ProductInit::Init, + AddAggKernel(KernelSignature::Make({Type::DECIMAL128}, MaxPrecisionDecimalType), ProductInit::Init, func.get(), SimdLevel::NONE); - AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, FirstType), ProductInit::Init, + AddAggKernel(KernelSignature::Make({Type::DECIMAL256}, MaxPrecisionDecimalType), ProductInit::Init, func.get(), SimdLevel::NONE); AddArrayScalarAggKernels(ProductInit::Init, {null()}, int64(), func.get()); DCHECK_OK(registry->AddFunction(std::move(func))); diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc index f2151e0a9e029..72cbcc75a1365 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc @@ -92,11 +92,20 @@ struct SumImpl : public ScalarAggregator { } Status Finalize(KernelContext*, Datum* out) override { + std::shared_ptr out_type_; + if (auto decimal128_type = std::dynamic_pointer_cast(out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal128Type::Make(Decimal128Type::kMaxPrecision, decimal128_type->scale())); + } else if (auto decimal256_type = std::dynamic_pointer_cast(out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal256Type::Make(Decimal256Type::kMaxPrecision, decimal256_type->scale())); + } else { + out_type_ = out_type; + } + if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count)) { - out->value = std::make_shared(out_type); + out->value = std::make_shared(out_type_); } else { - out->value = std::make_shared(this->sum, out_type); + out->value = std::make_shared(this->sum, out_type_); } return Status::OK(); } @@ -219,9 +228,22 @@ struct MeanImpl> template Status FinalizeImpl(Datum* out) { + std::shared_ptr out_type_; + if (auto decimal128_type = std::dynamic_pointer_cast(this->out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal128Type::Make(Decimal128Type::kMaxPrecision, decimal128_type->scale())); + } else if (auto decimal256_type = std::dynamic_pointer_cast(this->out_type)) { + ARROW_ASSIGN_OR_RAISE(out_type_, Decimal256Type::Make(Decimal256Type::kMaxPrecision, decimal256_type->scale())); + } else { + return Status::TypeError( + "The decimal specialization of MeanImpl was passed a type ", + this->out_type->ToString(), + " and not a decimal type" + ); + } + if ((!options.skip_nulls && this->nulls_observed) || (this->count < options.min_count) || (this->count == 0)) { - out->value = std::make_shared(this->out_type); + out->value = std::make_shared(out_type_); } else { SumCType quotient, remainder; ARROW_ASSIGN_OR_RAISE(std::tie(quotient, remainder), this->sum.Divide(this->count)); @@ -234,7 +256,7 @@ struct MeanImpl> quotient -= 1; } } - out->value = std::make_shared(quotient, this->out_type); + out->value = std::make_shared(quotient, out_type_); } return Status::OK(); } diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index c16d2f9aacf74..2c4eb32cd3e5d 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -349,10 +349,51 @@ def test_sum_array(arrow_type): arr = pa.array([], type=arrow_type) assert arr.sum().as_py() is None # noqa: E711 + assert pc.sum(arr).as_py() is None # noqa: E711 assert arr.sum(min_count=0).as_py() == 0 assert pc.sum(arr, min_count=0).as_py() == 0 +@pytest.mark.parametrize("arrow_type", [pa.decimal128(3, 2), pa.decimal256(3, 2)]) +def test_sum_decimal_array(arrow_type): + from decimal import Decimal + max_precision_type = pa.decimal128(38, arrow_type.scale) if pa.types.is_decimal128(arrow_type) else pa.decimal256(76, arrow_type.scale) + expected_sum = Decimal("5.79") + zero = Decimal("0.00") + + arr = pa.array([Decimal("1.23"), Decimal("4.56")], type=arrow_type) + assert arr.sum().as_py() == expected_sum + assert arr.sum().type == max_precision_type + assert pc.sum(arr).as_py() == expected_sum + assert pc.sum(arr).type == max_precision_type + + arr = pa.array([Decimal("1.23"), Decimal("4.56"), None], type=arrow_type) + assert arr.sum().as_py() == expected_sum + assert arr.sum().type == max_precision_type + assert pc.sum(arr).as_py() == expected_sum + assert pc.sum(arr).type == max_precision_type + + arr = pa.array([None], type=arrow_type) + assert arr.sum().as_py() is None # noqa: E711 + assert arr.sum().type == max_precision_type # noqa: E711 + assert pc.sum(arr).as_py() is None # noqa: E711 + assert pc.sum(arr).type == max_precision_type # noqa: E711 + assert arr.sum(min_count=0).as_py() == zero + assert arr.sum(min_count=0).type == max_precision_type + assert pc.sum(arr, min_count=0).as_py() == zero + assert pc.sum(arr, min_count=0).type == max_precision_type + + arr = pa.array([], type=arrow_type) + assert arr.sum().as_py() is None # noqa: E711 + assert arr.sum().type == max_precision_type # noqa: E711 + assert pc.sum(arr).as_py() is None # noqa: E711 + assert pc.sum(arr).type == max_precision_type # noqa: E711 + assert arr.sum(min_count=0).as_py() == zero + assert arr.sum(min_count=0).type == max_precision_type + assert pc.sum(arr, min_count=0).as_py() == zero + assert pc.sum(arr, min_count=0).type == max_precision_type + + @pytest.mark.parametrize('arrow_type', numerical_arrow_types) def test_sum_chunked_array(arrow_type): arr = pa.chunked_array([pa.array([1, 2, 3, 4], type=arrow_type)]) @@ -376,6 +417,39 @@ def test_sum_chunked_array(arrow_type): assert pc.sum(arr, min_count=0).as_py() == 0 +@pytest.mark.parametrize('arrow_type', [pa.decimal128(3, 2), pa.decimal256(3, 2)]) +def test_sum_chunked_array_decimal_type(arrow_type): + from decimal import Decimal + max_precision_type = pa.decimal128(38, arrow_type.scale) if pa.types.is_decimal128(arrow_type) else pa.decimal256(76, arrow_type.scale) + expected_sum = Decimal("5.79") + zero = Decimal("0.00") + + arr = pa.chunked_array([pa.array([Decimal("1.23"), Decimal("4.56")], type=arrow_type)]) + assert pc.sum(arr).as_py() == expected_sum + assert pc.sum(arr).type == max_precision_type + + arr = pa.chunked_array([ + pa.array([Decimal("1.23")], type=arrow_type), pa.array([Decimal("4.56")], type=arrow_type) + ]) + assert pc.sum(arr).as_py() == expected_sum + assert pc.sum(arr).type == max_precision_type + + arr = pa.chunked_array([ + pa.array([Decimal("1.23")], type=arrow_type), + pa.array([], type=arrow_type), + pa.array([Decimal("4.56")], type=arrow_type) + ]) + assert pc.sum(arr).as_py() == expected_sum + assert pc.sum(arr).type == max_precision_type + + arr = pa.chunked_array((), type=arrow_type) + assert arr.num_chunks == 0 + assert pc.sum(arr).as_py() is None # noqa: E711 + assert pc.sum(arr).type == max_precision_type + assert pc.sum(arr, min_count=0).as_py() == zero + assert pc.sum(arr, min_count=0).type == max_precision_type + + def test_mode_array(): # ARROW-9917 arr = pa.array([1, 1, 3, 4, 3, 5], type='int64')