From dd1151eb3e8610f20ac91a643deb02b869a721fe Mon Sep 17 00:00:00 2001
From: David Li
Date: Thu, 30 Sep 2021 09:43:38 -0400
Subject: [PATCH] ARROW-13130: [C++] Implement arithmetic kernels for decimals
---
.../arrow/compute/kernels/aggregate_basic.cc | 5 +-
.../compute/kernels/aggregate_internal.h | 21 ++
.../arrow/compute/kernels/aggregate_mode.cc | 60 +++-
.../compute/kernels/aggregate_quantile.cc | 50 ++-
.../compute/kernels/aggregate_tdigest.cc | 36 +-
.../arrow/compute/kernels/aggregate_test.cc | 193 +++++++++--
.../compute/kernels/aggregate_var_std.cc | 53 ++-
.../arrow/compute/kernels/hash_aggregate.cc | 303 ++++++++--------
.../compute/kernels/hash_aggregate_test.cc | 91 +++++
.../compute/kernels/scalar_arithmetic.cc | 232 +++++++++----
.../compute/kernels/scalar_arithmetic_test.cc | 327 +++++++++++++++++-
.../arrow/compute/kernels/scalar_validity.cc | 39 +++
.../compute/kernels/scalar_validity_test.cc | 39 +++
cpp/src/arrow/util/basic_decimal.cc | 12 +
cpp/src/arrow/util/basic_decimal.h | 6 +
docs/source/cpp/compute.rst | 162 +++++----
16 files changed, 1237 insertions(+), 392 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
index 25697f7d33b11..6d1812c657127 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc
@@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
auto init = [min_max_func](
KernelContext* ctx,
const KernelInitArgs& args) -> Result> {
- std::vector inputs = args.inputs;
- ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs));
- KernelInitArgs new_args{kernel, inputs, args.options};
+ ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs));
+ KernelInitArgs new_args{kernel, args.inputs, args.options};
return kernel->init(ctx, new_args);
};
diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h
index 22a54558f4e8a..946ec01900c5b 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_internal.h
+++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h
@@ -21,6 +21,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
+#include "arrow/util/int128_internal.h"
#include "arrow/util/logging.h"
namespace arrow {
@@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr sig, KernelInit init,
ScalarAggregateFinalize finalize, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);
+using arrow::internal::VisitSetBitRunsVoid;
+
+template
+struct GetSumType;
+
+template
+struct GetSumType> {
+ using SumType = double;
+};
+
+template
+struct GetSumType> {
+ using SumType = arrow::internal::int128_t;
+};
+
+template
+struct GetSumType> {
+ using SumType = typename TypeTraits::CType;
+};
+
// SumArray must be parameterized with the SIMD level since it's called both from
// translation units with and without vectorization. Normally it gets inlined but
// if not, without the parameter, we'll have multiple definitions of the same
diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
index f225f6bf569c3..ec07bd0184a4d 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc
@@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count";
constexpr uint64_t kCountEOF = ~0ULL;
-template
+template ::CType>
Result> PrepareOutput(int64_t n, KernelContext* ctx,
Datum* out) {
- const auto& mode_type = TypeTraits::type_singleton();
+ DCHECK_EQ(Type::STRUCT, out->type()->id());
+ const auto& out_type = checked_cast(*out->type());
+ DCHECK_EQ(2, out_type.num_fields());
+ const auto& mode_type = out_type.field(0)->type();
const auto& count_type = int64();
auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
@@ -61,10 +64,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx,
count_buffer = count_data->template GetMutableValues(1);
}
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
- *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
-
+ *out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0));
return std::make_pair(mode_buffer, count_buffer);
}
@@ -72,7 +72,7 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx,
// suboptimal for tiny or large n, possibly okay as we're not in hot path
template
Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
- using CType = typename InType::c_type;
+ using CType = typename TypeTraits::CType;
using ValueCountPair = std::pair;
auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
@@ -203,13 +203,25 @@ struct CountModer {
}
};
-// copy and sort approach for floating points or integers with wide value range
+// copy and sort approach for floating points, decimals, or integers with wide
+// value range
// O(n) space, O(nlogn) time
template
struct SortModer {
- using CType = typename T::c_type;
+ using CType = typename TypeTraits::CType;
using Allocator = arrow::stl::allocator;
+ template
+ static enable_if_floating_point GetNan() {
+ return static_cast(NAN);
+ }
+
+ template
+ static enable_if_t::value, CType> GetNan() {
+ DCHECK(false);
+ return static_cast(0);
+ }
+
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
@@ -246,7 +258,7 @@ struct SortModer {
if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
// handle NAN at last
if (nan_count > 0) {
- auto value_count = std::make_pair(static_cast(NAN), nan_count);
+ auto value_count = std::make_pair(GetNan(), nan_count);
nan_count = 0;
return value_count;
}
@@ -318,13 +330,18 @@ struct Moder::value &&
};
template
-struct Moder::value>> {
+struct Moder> {
+ SortModer impl;
+};
+
+template
+struct Moder> {
SortModer impl;
};
template
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
- using CType = typename T::c_type;
+ using CType = typename TypeTraits::CType;
const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
@@ -378,6 +395,21 @@ VectorKernel NewModeKernel(const std::shared_ptr& in_type) {
return kernel;
}
+Result ModeType(KernelContext*, const std::vector& descrs) {
+ return ValueDescr::Array(
+ struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())}));
+}
+
+VectorKernel NewModeKernel(Type::type type_id, ArrayKernelExec exec) {
+ VectorKernel kernel;
+ kernel.init = ModeState::Init;
+ kernel.can_execute_chunkwise = false;
+ kernel.output_chunked = false;
+ kernel.signature = KernelSignature::Make({InputType(type_id)}, OutputType(ModeType));
+ kernel.exec = std::move(exec);
+ return kernel;
+}
+
void AddBooleanModeKernel(VectorFunction* func) {
VectorKernel kernel = NewModeKernel(boolean());
kernel.exec = ModeExecutor::Exec;
@@ -411,6 +443,10 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) {
&default_options);
AddBooleanModeKernel(func.get());
AddNumericModeKernels(func.get());
+ DCHECK_OK(func->AddKernel(
+ NewModeKernel(Type::DECIMAL128, ModeExecutor::Exec)));
+ DCHECK_OK(func->AddKernel(
+ NewModeKernel(Type::DECIMAL256, ModeExecutor::Exec)));
DCHECK_OK(registry->AddFunction(std::move(func)));
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
index 62e375e695087..cd2410cc9eb75 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc
@@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q,
return datapoint_index;
}
+template
+double DataPointToDouble(T value, const DataType&) {
+ return static_cast(value);
+}
+double DataPointToDouble(const Decimal128& value, const DataType& ty) {
+ return value.ToDouble(checked_cast(ty).scale());
+}
+double DataPointToDouble(const Decimal256& value, const DataType& ty) {
+ return value.ToDouble(checked_cast(ty).scale());
+}
+
// copy and nth_element approach, large memory footprint
template
struct SortQuantiler {
- using CType = typename InType::c_type;
+ using CType = typename TypeTraits::CType;
using Allocator = arrow::stl::allocator;
Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
@@ -106,8 +117,7 @@ struct SortQuantiler {
// prepare out array
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
- const std::shared_ptr out_type =
- is_datapoint ? TypeTraits::type_singleton() : float64();
+ const std::shared_ptr out_type = is_datapoint ? datum.type() : float64();
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
@@ -142,8 +152,9 @@ struct SortQuantiler {
double* out_buffer = out_data->template GetMutableValues(1);
for (int64_t i = 0; i < out_length; ++i) {
const int64_t q_index = q_indices[i];
- out_buffer[q_index] = GetQuantileByInterp(
- in_buffer, &last_index, options.q[q_index], options.interpolation);
+ out_buffer[q_index] =
+ GetQuantileByInterp(in_buffer, &last_index, options.q[q_index],
+ options.interpolation, *datum.type());
}
}
}
@@ -170,8 +181,8 @@ struct SortQuantiler {
// return quantile interpolated from adjacent input data points
double GetQuantileByInterp(std::vector& in, uint64_t* last_index,
- double q,
- enum QuantileOptions::Interpolation interpolation) {
+ double q, enum QuantileOptions::Interpolation interpolation,
+ const DataType& in_type) {
const double index = (in.size() - 1) * q;
const uint64_t lower_index = static_cast(index);
const double fraction = index - lower_index;
@@ -181,7 +192,7 @@ struct SortQuantiler {
std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
}
- const double lower_value = static_cast(in[lower_index]);
+ const double lower_value = DataPointToDouble(in[lower_index], in_type);
if (fraction == 0) {
*last_index = lower_index;
return lower_value;
@@ -197,7 +208,7 @@ struct SortQuantiler {
}
*last_index = lower_index;
- const double higher_value = static_cast(in[higher_index]);
+ const double higher_value = DataPointToDouble(in[higher_index], in_type);
if (interpolation == QuantileOptions::LINEAR) {
// more stable than naive linear interpolation
@@ -399,10 +410,15 @@ struct ExactQuantiler::value>> {
SortQuantiler impl;
};
+template
+struct ExactQuantiler::value>> {
+ SortQuantiler impl;
+};
+
template
Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
- using CType = typename T::c_type;
+ using CType = typename TypeTraits::CType;
ArrayData* output = out->mutable_array();
output->length = options.q.size();
auto out_type = IsDataPoint(options) ? scalar.type : float64();
@@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
} else {
double* out_buffer = output->template GetMutableValues(1);
for (int64_t i = 0; i < output->length; i++) {
- out_buffer[i] = static_cast(UnboxScalar::Unbox(scalar));
+ out_buffer[i] = DataPointToDouble(UnboxScalar::Unbox(scalar), *scalar.type);
}
}
return Status::OK();
@@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) {
base.exec = GenerateNumeric(*ty);
DCHECK_OK(func->AddKernel(base));
}
+ {
+ base.signature =
+ KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput));
+ base.exec = QuantileExecutor::Exec;
+ DCHECK_OK(func->AddKernel(base));
+ }
+ {
+ base.signature =
+ KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput));
+ base.exec = QuantileExecutor::Exec;
+ DCHECK_OK(func->AddKernel(base));
+ }
}
const FunctionDoc quantile_doc{
diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
index 0fddf38f575c9..7c86267d94006 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
@@ -34,13 +34,25 @@ template
struct TDigestImpl : public ScalarAggregator {
using ThisType = TDigestImpl;
using ArrayType = typename TypeTraits::ArrayType;
- using CType = typename ArrowType::c_type;
+ using CType = typename TypeTraits::CType;
- explicit TDigestImpl(const TDigestOptions& options)
+ TDigestImpl(const TDigestOptions& options, const DataType& in_type)
: options{options},
tdigest{options.delta, options.buffer_size},
count{0},
- all_valid{true} {}
+ decimal_scale{0},
+ all_valid{true} {
+ if (is_decimal_type::value) {
+ decimal_scale = checked_cast(in_type).scale();
+ }
+ }
+
+ template
+ double ToDouble(T value) const {
+ return static_cast(value);
+ }
+ double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); }
+ double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); }
Status Consume(KernelContext*, const ExecBatch& batch) override {
if (!this->all_valid) return Status::OK();
@@ -57,7 +69,7 @@ struct TDigestImpl : public ScalarAggregator {
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
- this->tdigest.NanAdd(values[pos + i]);
+ this->tdigest.NanAdd(ToDouble(values[pos + i]));
}
});
}
@@ -66,7 +78,7 @@ struct TDigestImpl : public ScalarAggregator {
if (batch[0].scalar()->is_valid) {
this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
- this->tdigest.NanAdd(value);
+ this->tdigest.NanAdd(ToDouble(value));
}
}
}
@@ -110,6 +122,7 @@ struct TDigestImpl : public ScalarAggregator {
const TDigestOptions options;
TDigest tdigest;
int64_t count;
+ int32_t decimal_scale;
bool all_valid;
};
@@ -132,8 +145,14 @@ struct TDigestInitState {
}
template
- enable_if_t::value, Status> Visit(const Type&) {
- state.reset(new TDigestImpl(options));
+ enable_if_number Visit(const Type&) {
+ state.reset(new TDigestImpl(options, in_type));
+ return Status::OK();
+ }
+
+ template
+ enable_if_decimal Visit(const Type&) {
+ state.reset(new TDigestImpl(options, in_type));
return Status::OK();
}
@@ -154,7 +173,7 @@ void AddTDigestKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
- auto sig = KernelSignature::Make({InputType(ty)}, float64());
+ auto sig = KernelSignature::Make({InputType(ty->id())}, float64());
AddAggKernel(std::move(sig), init, func);
}
}
@@ -179,6 +198,7 @@ std::shared_ptr AddTDigestAggKernels() {
auto func = std::make_shared(
"tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options);
AddTDigestKernels(TDigestInit, NumericTypes(), func.get());
+ AddTDigestKernels(TDigestInit, {decimal128(1, 1), decimal256(1, 1)}, func.get());
return func;
}
diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc
index 992f73698648d..fec847d382c6d 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc
@@ -2269,6 +2269,26 @@ TYPED_TEST(TestStringIndexKernel, Basics) {
// Mode
//
+template
+void CheckModes(const Datum& array, const ModeOptions options,
+ const std::vector& expected_modes,
+ const std::vector& expected_counts) {
+ ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options));
+ ValidateOutput(out);
+ const StructArray out_array(out.array());
+ ASSERT_EQ(out_array.length(), expected_modes.size());
+ ASSERT_EQ(out_array.num_fields(), 2);
+
+ const CType* out_modes = out_array.field(0)->data()->GetValues(1);
+ const int64_t* out_counts = out_array.field(1)->data()->GetValues(1);
+ for (int i = 0; i < out_array.length(); ++i) {
+ // equal or nan equal
+ ASSERT_TRUE((expected_modes[i] == out_modes[i]) ||
+ (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i]));
+ ASSERT_EQ(expected_counts[i], out_counts[i]);
+ }
+}
+
template
class TestPrimitiveModeKernel : public ::testing::Test {
public:
@@ -2279,21 +2299,7 @@ class TestPrimitiveModeKernel : public ::testing::Test {
void AssertModesAre(const Datum& array, const ModeOptions options,
const std::vector& expected_modes,
const std::vector& expected_counts) {
- ASSERT_OK_AND_ASSIGN(Datum out, Mode(array, options));
- ValidateOutput(out);
- const StructArray out_array(out.array());
- ASSERT_EQ(out_array.length(), expected_modes.size());
- ASSERT_EQ(out_array.num_fields(), 2);
-
- const CType* out_modes = out_array.field(0)->data()->GetValues(1);
- const int64_t* out_counts = out_array.field(1)->data()->GetValues(1);
- for (int i = 0; i < out_array.length(); ++i) {
- // equal or nan equal
- ASSERT_TRUE(
- (expected_modes[i] == out_modes[i]) ||
- (expected_modes[i] != expected_modes[i] && out_modes[i] != out_modes[i]));
- ASSERT_EQ(expected_counts[i], out_counts[i]);
- }
+ CheckModes(array, options, expected_modes, expected_counts);
}
void AssertModesAre(const std::string& json, const int n,
@@ -2522,6 +2528,89 @@ TYPED_TEST(TestFloatingModeKernel, Floats) {
this->AssertModesEmpty(ScalarFromJSON(in_ty, "null"), ModeOptions(/*n=*/1));
}
+template
+class TestDecimalModeKernel : public ::testing::Test {
+ public:
+ using CType = typename TypeTraits::CType;
+
+ void AssertModesAre(const Datum& array, const ModeOptions options,
+ const std::vector& expected_modes,
+ const std::vector& expected_counts) {
+ CheckModes(array, options, values(expected_modes), expected_counts);
+ }
+
+ CType value(const std::string& s) const {
+ EXPECT_OK_AND_ASSIGN(auto out, CType::FromString(s));
+ return out;
+ }
+
+ std::vector values(const std::vector& strings) const {
+ std::vector values;
+ for (const auto& s : strings) {
+ values.push_back(value(s));
+ }
+ return values;
+ }
+
+ std::shared_ptr type_instance() { return std::make_shared(4, 2); }
+};
+
+TYPED_TEST_SUITE(TestDecimalModeKernel, DecimalArrowTypes);
+
+TYPED_TEST(TestDecimalModeKernel, Decimals) {
+ auto ty = this->type_instance();
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01"])"),
+ ModeOptions(1), {"5.01"}, {3});
+ this->AssertModesAre(
+ ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"),
+ ModeOptions(1), {"-1.42"}, {3});
+ this->AssertModesAre(
+ ArrayFromJSON(ty, R"(["5.01", "-1.42", "-1.42", "5.01", "5.01", "-1.42"])"),
+ ModeOptions(2), {"-1.42", "5.01"}, {3, 3});
+
+ this->AssertModesAre(ArrayFromJSON(ty, "[]"), ModeOptions(1), {}, {});
+
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"),
+ ModeOptions(/*n=*/1), {"-2.00"}, {2});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3),
+ {"-2.00"}, {2});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["-2.00", "-2.00", null])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/3), {},
+ {});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00"])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3),
+ {"-2.00"}, {2});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00", "-2.00", null])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {},
+ {});
+ this->AssertModesAre(ArrayFromJSON(ty, R"(["1.86", "-2.00"])"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/3), {},
+ {});
+
+ this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {"0.00"}, {1});
+ this->AssertModesAre(ScalarFromJSON(ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false), {}, {});
+ this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {},
+ {});
+ this->AssertModesAre(ScalarFromJSON(ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/true, /*min_count=*/2), {},
+ {});
+ this->AssertModesAre(ScalarFromJSON(ty, R"("0.00")"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {},
+ {});
+ this->AssertModesAre(ScalarFromJSON(ty, "null"),
+ ModeOptions(/*n=*/1, /*skip_nulls=*/false, /*min_count=*/2), {},
+ {});
+ this->AssertModesAre(ScalarFromJSON(ty, R"("5.00")"), ModeOptions(/*n=*/1), {"5.00"},
+ {1});
+ this->AssertModesAre(ScalarFromJSON(ty, "null"), ModeOptions(/*n=*/1), {}, {});
+}
+
TEST_F(TestInt8ModeKernelValueRange, Basics) {
this->AssertModeIs("[0, 127, -128, -128]", -128, 2);
this->AssertModeIs("[127, 127, 127]", 127, 3);
@@ -2624,6 +2713,24 @@ TEST_F(TestInt32ModeKernel, Sliced) {
// Variance/Stddev
//
+void CheckVarStd(const Datum& array, const VarianceOptions& options,
+ double expected_var) {
+ ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options));
+ ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options));
+ auto var = checked_cast(out_var.scalar().get());
+ auto std = checked_cast(out_std.scalar().get());
+ ASSERT_TRUE(var->is_valid && std->is_valid);
+ // Near zero these macros don't work as well
+ // (and MinGW can give results slightly off from zero)
+ if (std::abs(expected_var) < 1e-20) {
+ ASSERT_NEAR(std->value * std->value, var->value, 1e-20);
+ ASSERT_NEAR(var->value, expected_var, 1e-20);
+ } else {
+ ASSERT_DOUBLE_EQ(std->value * std->value, var->value);
+ ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP
+ }
+}
+
template
class TestPrimitiveVarStdKernel : public ::testing::Test {
public:
@@ -2632,12 +2739,12 @@ class TestPrimitiveVarStdKernel : public ::testing::Test {
void AssertVarStdIs(const Array& array, const VarianceOptions& options,
double expected_var) {
- AssertVarStdIsInternal(array, options, expected_var);
+ CheckVarStd(array, options, expected_var);
}
void AssertVarStdIs(const std::shared_ptr& array,
const VarianceOptions& options, double expected_var) {
- AssertVarStdIsInternal(array, options, expected_var);
+ CheckVarStd(array, options, expected_var);
}
void AssertVarStdIs(const std::string& json, const VarianceOptions& options,
@@ -2675,17 +2782,6 @@ class TestPrimitiveVarStdKernel : public ::testing::Test {
std::shared_ptr type_singleton() { return Traits::type_singleton(); }
private:
- void AssertVarStdIsInternal(const Datum& array, const VarianceOptions& options,
- double expected_var) {
- ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options));
- ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options));
- auto var = checked_cast(out_var.scalar().get());
- auto std = checked_cast(out_std.scalar().get());
- ASSERT_TRUE(var->is_valid && std->is_valid);
- ASSERT_DOUBLE_EQ(std->value * std->value, var->value);
- ASSERT_DOUBLE_EQ(var->value, expected_var); // < 4ULP
- }
-
void AssertVarStdIsInvalidInternal(const Datum& array, const VarianceOptions& options) {
ASSERT_OK_AND_ASSIGN(Datum out_var, Variance(array, options));
ASSERT_OK_AND_ASSIGN(Datum out_std, Stddev(array, options));
@@ -2935,6 +3031,18 @@ TEST_F(TestVarStdKernelIntegerLength, Basics) {
}
#endif
+TEST(TestVarStdKernel, Decimal) {
+ // Effectively treated as double, sanity check results here
+ for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) {
+ CheckVarStd(ArrayFromJSON(ty, R"(["1.00"])"), VarianceOptions(), 0);
+ CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00", "3.00"])"), VarianceOptions(),
+ 0.6666666666666666);
+ CheckVarStd(ScalarFromJSON(ty, R"("1.00")"), VarianceOptions(), 0);
+ CheckVarStd(ArrayFromJSON(ty, R"([null, "1.00", "2.00"])"),
+ VarianceOptions(/*ddof=*/1), 0.5);
+ }
+}
+
//
// Quantile
//
@@ -3476,6 +3584,24 @@ TEST(TestQuantileKernel, AllNullsOrNaNs) {
}
}
+TEST(TestQuantileKernel, Decimal) {
+ auto check = [](const std::shared_ptr& input, QuantileOptions options,
+ const std::shared_ptr& expected) {
+ ASSERT_OK_AND_ASSIGN(Datum out, Quantile(input, options));
+ auto out_array = out.make_array();
+ ValidateOutput(*out_array);
+ AssertArraysEqual(*expected, *out_array, /*verbose=*/true);
+ };
+ for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) {
+ check(ArrayFromJSON(ty, R"(["1.00", "5.00", null])"),
+ QuantileOptions(0.5, QuantileOptions::LINEAR),
+ ArrayFromJSON(float64(), R"([3.00])"));
+ check(ArrayFromJSON(ty, R"(["1.00", "2.00", "5.00"])"),
+ QuantileOptions(0.5, QuantileOptions::NEAREST),
+ ArrayFromJSON(ty, R"(["2.00"])"));
+ }
+}
+
TEST(TestQuantileKernel, Scalar) {
for (const auto& ty : {float64(), int64(), uint64()}) {
QuantileOptions options(std::vector{0.0, 0.5, 1.0});
@@ -3543,6 +3669,17 @@ TEST(TestTDigestKernel, AllNullsOrNaNs) {
}
}
+TEST(TestTDigestKernel, Decimal) {
+ for (const auto& ty : {decimal128(3, 2), decimal256(3, 2)}) {
+ ASSERT_OK_AND_ASSIGN(auto decimal_array,
+ TDigest(ArrayFromJSON(ty, R"(["1.00", "2.00", "3.25"])")));
+ ASSERT_OK_AND_ASSIGN(auto float_array,
+ TDigest(ArrayFromJSON(float64(), "[1, 2, 3.25]")));
+ AssertArraysApproxEqual(*float_array.make_array(), *decimal_array.make_array(),
+ /*verbose=*/true);
+ }
+}
+
TEST(TestTDigestKernel, Scalar) {
for (const auto& ty : {float64(), int64(), uint64()}) {
TDigestOptions options(std::vector{0.0, 0.5, 1.0});
diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
index d0d3c514fae2e..feb98718aee3c 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc
@@ -36,12 +36,21 @@ using arrow::internal::VisitSetBitRunsVoid;
template
struct VarStdState {
using ArrayType = typename TypeTraits::ArrayType;
- using CType = typename ArrowType::c_type;
+ using CType = typename TypeTraits::CType;
using ThisType = VarStdState;
- explicit VarStdState(VarianceOptions options) : options(options) {}
+ explicit VarStdState(int32_t decimal_scale, VarianceOptions options)
+ : decimal_scale(decimal_scale), options(options) {}
- // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+ template
+ double ToDouble(T value) const {
+ return static_cast(value);
+ }
+ double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); }
+ double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); }
+
+ // float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with `two pass
+ // algorithm`
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
template
enable_if_t::value || (sizeof(CType) > 4)> Consume(
@@ -52,14 +61,13 @@ struct VarStdState {
return;
}
- using SumType =
- typename std::conditional::value, double, int128_t>::type;
- SumType sum = SumArray(*array.data());
+ using SumType = typename internal::GetSumType::SumType;
+ SumType sum = internal::SumArray(*array.data());
- const double mean = static_cast(sum) / count;
- const double m2 =
- SumArray(*array.data(), [mean](CType value) {
- const double v = static_cast(value);
+ const double mean = ToDouble(sum) / count;
+ const double m2 = internal::SumArray(
+ *array.data(), [this, mean](CType value) {
+ const double v = ToDouble(value);
return (v - mean) * (v - mean);
});
@@ -102,7 +110,7 @@ struct VarStdState {
});
// merge variance
- ThisType state(options);
+ ThisType state(decimal_scale, options);
state.count = var_std.count;
state.mean = var_std.mean();
state.m2 = var_std.m2();
@@ -116,7 +124,7 @@ struct VarStdState {
this->m2 = 0;
if (scalar.is_valid) {
this->count = count;
- this->mean = static_cast(UnboxScalar::Unbox(scalar));
+ this->mean = ToDouble(UnboxScalar::Unbox(scalar));
} else {
this->count = 0;
this->mean = 0;
@@ -141,6 +149,7 @@ struct VarStdState {
&this->mean, &this->m2);
}
+ const int32_t decimal_scale;
const VarianceOptions options;
int64_t count = 0;
double mean = 0;
@@ -153,9 +162,9 @@ struct VarStdImpl : public ScalarAggregator {
using ThisType = VarStdImpl;
using ArrayType = typename TypeTraits::ArrayType;
- explicit VarStdImpl(const std::shared_ptr& out_type,
+ explicit VarStdImpl(int32_t decimal_scale, const std::shared_ptr& out_type,
const VarianceOptions& options, VarOrStd return_type)
- : out_type(out_type), state(options), return_type(return_type) {}
+ : out_type(out_type), state(decimal_scale, options), return_type(return_type) {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
if (batch[0].is_array()) {
@@ -216,8 +225,16 @@ struct VarStdInitState {
}
template
- enable_if_t::value, Status> Visit(const Type&) {
- state.reset(new VarStdImpl(out_type, options, return_type));
+ enable_if_number Visit(const Type&) {
+ state.reset(
+ new VarStdImpl(/*decimal_scale=*/0, out_type, options, return_type));
+ return Status::OK();
+ }
+
+ template
+ enable_if_decimal Visit(const Type&) {
+ state.reset(new VarStdImpl(checked_cast(in_type).scale(),
+ out_type, options, return_type));
return Status::OK();
}
@@ -247,7 +264,7 @@ void AddVarStdKernels(KernelInit init,
const std::vector>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
- auto sig = KernelSignature::Make({InputType(ty)}, float64());
+ auto sig = KernelSignature::Make({InputType(ty->id())}, float64());
AddAggKernel(std::move(sig), init, func);
}
}
@@ -275,6 +292,7 @@ std::shared_ptr AddStddevAggKernels() {
auto func = std::make_shared(
"stddev", Arity::Unary(), &stddev_doc, &default_std_options);
AddVarStdKernels(StddevInit, NumericTypes(), func.get());
+ AddVarStdKernels(StddevInit, {decimal128(1, 1), decimal256(1, 1)}, func.get());
return func;
}
@@ -283,6 +301,7 @@ std::shared_ptr AddVarianceAggKernels() {
auto func = std::make_shared(
"variance", Arity::Unary(), &variance_doc, &default_var_options);
AddVarStdKernels(VarianceInit, NumericTypes(), func.get());
+ AddVarStdKernels(VarianceInit, {decimal128(1, 1), decimal256(1, 1)}, func.get());
return func;
}
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index 73c8f9d26c0e0..9f53267535511 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -519,7 +519,8 @@ struct GrouperFastImpl : Grouper {
/// Implementations should be default constructible and perform initialization in
/// Init().
struct GroupedAggregator : KernelState {
- virtual Status Init(ExecContext*, const FunctionOptions*) = 0;
+ virtual Status Init(ExecContext*, const std::vector& inputs,
+ const FunctionOptions*) = 0;
virtual Status Resize(int64_t new_num_groups) = 0;
@@ -536,7 +537,7 @@ template
Result> HashAggregateInit(KernelContext* ctx,
const KernelInitArgs& args) {
auto impl = ::arrow::internal::make_unique();
- RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options));
+ RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.inputs, args.options));
return std::move(impl);
}
@@ -636,7 +637,8 @@ void VisitGroupedValuesNonNull(const ExecBatch& batch, ConsumeValue&& valid_func
// Count implementation
struct GroupedCountImpl : public GroupedAggregator {
- Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ Status Init(ExecContext* ctx, const std::vector&,
+ const FunctionOptions* options) override {
options_ = checked_cast(*options);
counts_ = BufferBuilder(ctx->memory_pool());
return Status::OK();
@@ -725,13 +727,14 @@ struct GroupedReducingAggregator : public GroupedAggregator {
using CType = typename TypeTraits::CType;
using InputCType = typename TypeTraits::CType;
- Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ Status Init(ExecContext* ctx, const std::vector& inputs,
+ const FunctionOptions* options) override {
pool_ = ctx->memory_pool();
options_ = checked_cast(*options);
reduced_ = TypedBufferBuilder(pool_);
counts_ = TypedBufferBuilder(pool_);
no_nulls_ = TypedBufferBuilder(pool_);
- // out_type_ initialized by SumInit
+ out_type_ = GetOutType(inputs[0].type);
return Status::OK();
}
@@ -829,6 +832,18 @@ struct GroupedReducingAggregator : public GroupedAggregator {
std::shared_ptr out_type() const override { return out_type_; }
+ template
+ static enable_if_t::value, std::shared_ptr> GetOutType(
+ const std::shared_ptr& in_type) {
+ return TypeTraits::type_singleton();
+ }
+
+ template
+ static enable_if_decimal> GetOutType(
+ const std::shared_ptr& in_type) {
+ return in_type;
+ }
+
int64_t num_groups_ = 0;
ScalarAggregateOptions options_;
TypedBufferBuilder reduced_;
@@ -838,76 +853,35 @@ struct GroupedReducingAggregator : public GroupedAggregator {
MemoryPool* pool_;
};
-// ----------------------------------------------------------------------
-// Sum implementation
-
-template
-struct GroupedSumImpl : public GroupedReducingAggregator> {
- using Base = GroupedReducingAggregator>;
- using CType = typename Base::CType;
- using InputCType = typename Base::InputCType;
-
- // Default value for a group
- static CType NullValue(const DataType&) { return CType(0); }
-
- template
- static enable_if_number Reduce(const DataType&, const CType u,
- const InputCType v) {
- return static_cast(to_unsigned(u) + to_unsigned(static_cast(v)));
- }
-
- static CType Reduce(const DataType&, const CType u, const CType v) {
- return static_cast(to_unsigned(u) + to_unsigned(v));
- }
-
- using Base::Finish;
-};
-
-template class Impl, typename T>
-Result> SumInit(KernelContext* ctx,
- const KernelInitArgs& args) {
- ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit>(ctx, args));
- static_cast*>(impl.get())->out_type_ =
- TypeTraits::AccType>::type_singleton();
- return std::move(impl);
-}
-
-template
-Result> DecimalSumInit(KernelContext* ctx,
- const KernelInitArgs& args) {
- ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit(ctx, args));
- static_cast(impl.get())->out_type_ = args.inputs[0].type;
- return std::move(impl);
-}
-
-struct GroupedSumFactory {
+template class Impl, const char* kFriendlyName>
+struct GroupedReducingFactory {
template ::Type>
Status Visit(const T&) {
- kernel = MakeKernel(std::move(argument_type), SumInit);
+ kernel = MakeKernel(std::move(argument_type), HashAggregateInit>);
return Status::OK();
}
Status Visit(const Decimal128Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
+ kernel =
+ MakeKernel(std::move(argument_type), HashAggregateInit>);
return Status::OK();
}
Status Visit(const Decimal256Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
+ kernel =
+ MakeKernel(std::move(argument_type), HashAggregateInit>);
return Status::OK();
}
Status Visit(const HalfFloatType& type) {
- return Status::NotImplemented("Summing data of type ", type);
+ return Status::NotImplemented("Computing ", kFriendlyName, " of type ", type);
}
Status Visit(const DataType& type) {
- return Status::NotImplemented("Summing data of type ", type);
+ return Status::NotImplemented("Computing ", kFriendlyName, " of type ", type);
}
static Result Make(const std::shared_ptr& type) {
- GroupedSumFactory factory;
+ GroupedReducingFactory factory;
factory.argument_type = InputType::Array(type->id());
RETURN_NOT_OK(VisitTypeInline(*type, &factory));
return std::move(factory.kernel);
@@ -917,6 +891,34 @@ struct GroupedSumFactory {
InputType argument_type;
};
+// ----------------------------------------------------------------------
+// Sum implementation
+
+template
+struct GroupedSumImpl : public GroupedReducingAggregator> {
+ using Base = GroupedReducingAggregator>;
+ using CType = typename Base::CType;
+ using InputCType = typename Base::InputCType;
+
+ // Default value for a group
+ static CType NullValue(const DataType&) { return CType(0); }
+
+ template
+ static enable_if_number Reduce(const DataType&, const CType u,
+ const InputCType v) {
+ return static_cast(to_unsigned(u) + to_unsigned(static_cast(v)));
+ }
+
+ static CType Reduce(const DataType&, const CType u, const CType v) {
+ return static_cast(to_unsigned(u) + to_unsigned(v));
+ }
+
+ using Base::Finish;
+};
+
+static constexpr const char kSumName[] = "sum";
+using GroupedSumFactory = GroupedReducingFactory;
+
// ----------------------------------------------------------------------
// Product implementation
@@ -945,43 +947,8 @@ struct GroupedProductImpl final
using Base::Finish;
};
-struct GroupedProductFactory {
- template ::Type>
- Status Visit(const T&) {
- kernel = MakeKernel(std::move(argument_type), SumInit);
- return Status::OK();
- }
-
- Status Visit(const Decimal128Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
- return Status::OK();
- }
-
- Status Visit(const Decimal256Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
- return Status::OK();
- }
-
- Status Visit(const HalfFloatType& type) {
- return Status::NotImplemented("Taking product of data of type ", type);
- }
-
- Status Visit(const DataType& type) {
- return Status::NotImplemented("Taking product of data of type ", type);
- }
-
- static Result Make(const std::shared_ptr& type) {
- GroupedProductFactory factory;
- factory.argument_type = InputType::Array(type->id());
- RETURN_NOT_OK(VisitTypeInline(*type, &factory));
- return std::move(factory.kernel);
- }
-
- HashAggregateKernel kernel;
- InputType argument_type;
-};
+static constexpr const char kProductName[] = "product";
+using GroupedProductFactory = GroupedReducingFactory;
// ----------------------------------------------------------------------
// Mean implementation
@@ -1040,43 +1007,8 @@ struct GroupedMeanImpl : public GroupedReducingAggregator::Type>
- Status Visit(const T&) {
- kernel = MakeKernel(std::move(argument_type), SumInit);
- return Status::OK();
- }
-
- Status Visit(const Decimal128Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
- return Status::OK();
- }
-
- Status Visit(const Decimal256Type&) {
- kernel = MakeKernel(std::move(argument_type),
- DecimalSumInit>);
- return Status::OK();
- }
-
- Status Visit(const HalfFloatType& type) {
- return Status::NotImplemented("Computing mean of type ", type);
- }
-
- Status Visit(const DataType& type) {
- return Status::NotImplemented("Computing mean of type ", type);
- }
-
- static Result Make(const std::shared_ptr& type) {
- GroupedMeanFactory factory;
- factory.argument_type = InputType::Array(type->id());
- RETURN_NOT_OK(VisitTypeInline(*type, &factory));
- return std::move(factory.kernel);
- }
-
- HashAggregateKernel kernel;
- InputType argument_type;
-};
+static constexpr const char kMeanName[] = "mean";
+using GroupedMeanFactory = GroupedReducingFactory;
// Variance/Stdev implementation
@@ -1084,10 +1016,22 @@ using arrow::internal::int128_t;
template
struct GroupedVarStdImpl : public GroupedAggregator {
- using CType = typename Type::c_type;
+ using CType = typename TypeTraits::CType;
+
+ Status Init(ExecContext* ctx, const std::vector& inputs,
+ const FunctionOptions* options) override {
+ options_ = *checked_cast(options);
+ if (is_decimal_type::value) {
+ const int32_t scale = checked_cast(*inputs[0].type).scale();
+ return InitInternal(ctx, scale, options);
+ }
+ return InitInternal(ctx, 0, options);
+ }
- Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+ Status InitInternal(ExecContext* ctx, int32_t decimal_scale,
+ const FunctionOptions* options) {
options_ = *checked_cast(options);
+ decimal_scale_ = decimal_scale;
ctx_ = ctx;
pool_ = ctx->memory_pool();
counts_ = TypedBufferBuilder(pool_);
@@ -1107,18 +1051,28 @@ struct GroupedVarStdImpl : public GroupedAggregator {
return Status::OK();
}
+ template
+ double ToDouble(T value) const {
+ return static_cast(value);
+ }
+ double ToDouble(const Decimal128& value) const {
+ return value.ToDouble(decimal_scale_);
+ }
+ double ToDouble(const Decimal256& value) const {
+ return value.ToDouble(decimal_scale_);
+ }
+
Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
- // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
- // (see aggregate_var_std.cc)
+ // float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with
+ // `two pass algorithm` (see aggregate_var_std.cc)
template
enable_if_t::value || (sizeof(CType) > 4), Status> ConsumeImpl(
const ExecBatch& batch) {
- using SumType =
- typename std::conditional::value, double, int128_t>::type;
+ using SumType = typename internal::GetSumType::SumType;
GroupedVarStdImpl state;
- RETURN_NOT_OK(state.Init(ctx_, &options_));
+ RETURN_NOT_OK(state.InitInternal(ctx_, decimal_scale_, &options_));
RETURN_NOT_OK(state.Resize(num_groups_));
int64_t* counts = state.counts_.mutable_data();
double* means = state.means_.mutable_data();
@@ -1137,12 +1091,12 @@ struct GroupedVarStdImpl : public GroupedAggregator {
[&](uint32_t g) { BitUtil::ClearBit(no_nulls, g); });
for (int64_t i = 0; i < num_groups_; i++) {
- means[i] = static_cast