Skip to content

Commit

Permalink
ARROW-13130: [C++] Add decimal support to arithmetic kernels
Browse files Browse the repository at this point in the history
This adds decimal support for the following kernels (and _checked variants where applicable): abs, acos, add/sub/mul/div, asin, atan, atan2, ceil, cos, floor, hash_stddev, hash_tdigest, hash_variance, is_finite, is_inf, is_nan, ln, log1p, log2, logb, mode, negate, power, quantile, round, round_to_multiple, sign, sin, stddev/variance, tan, tdigest, trunc

Most kernels cast decimals to double and proceed. Some, including rounding, directly operate on decimals. Aggregate kernels directly operate on decimals (and cast to double inline) since DispatchBest is not usable for the aggregate nodes (at least, unless we also reimplement implicit casting there).

Additionally, ValidateFull for scalars/arrays now checks FitsInPrecision. A number of tests were adjusted to account for this.

Closes #11313 from lidavidm/arrow-13130

Authored-by: David Li <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
lidavidm authored and pitrou committed Nov 4, 2021
1 parent 3626a08 commit 6f4c991
Show file tree
Hide file tree
Showing 16 changed files with 1,197 additions and 359 deletions.
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
auto init = [min_max_func](
KernelContext* ctx,
const KernelInitArgs& args) -> Result<std::unique_ptr<KernelState>> {
std::vector<ValueDescr> inputs = args.inputs;
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs));
KernelInitArgs new_args{kernel, inputs, args.options};
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs));
KernelInitArgs new_args{kernel, args.inputs, args.options};
return kernel->init(ctx, new_args);
};

Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/int128_internal.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr<KernelSignature> sig, KernelInit init,
ScalarAggregateFinalize finalize, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);

using arrow::internal::VisitSetBitRunsVoid;

template <typename T, typename Enable = void>
struct GetSumType;

template <typename T>
struct GetSumType<T, enable_if_floating_point<T>> {
using SumType = double;
};

template <typename T>
struct GetSumType<T, enable_if_integer<T>> {
using SumType = arrow::internal::int128_t;
};

template <typename T>
struct GetSumType<T, enable_if_decimal<T>> {
using SumType = typename TypeTraits<T>::CType;
};

// SumArray must be parameterized with the SIMD level since it's called both from
// translation units with and without vectorization. Normally it gets inlined but
// if not, without the parameter, we'll have multiple definitions of the same
Expand Down
95 changes: 62 additions & 33 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count";

constexpr uint64_t kCountEOF = ~0ULL;

template <typename InType, typename CType = typename InType::c_type>
template <typename InType, typename CType = typename TypeTraits<InType>::CType>
Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
Datum* out) {
const auto& mode_type = TypeTraits<InType>::type_singleton();
DCHECK_EQ(Type::STRUCT, out->type()->id());
const auto& out_type = checked_cast<const StructType&>(*out->type());
DCHECK_EQ(2, out_type.num_fields());
const auto& mode_type = out_type.field(0)->type();
const auto& count_type = int64();

auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
Expand All @@ -61,18 +64,15 @@ Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
count_buffer = count_data->template GetMutableValues<int64_t>(1);
}

const auto& out_type =
struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
*out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));

*out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0));
return std::make_pair(mode_buffer, count_buffer);
}

// find top-n value:count pairs with minimal heap
// suboptimal for tiny or large n, possibly okay as we're not in hot path
template <typename InType, typename Generator>
Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;

using ValueCountPair = std::pair<CType, uint64_t>;
auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
Expand Down Expand Up @@ -203,13 +203,25 @@ struct CountModer<BooleanType> {
}
};

// copy and sort approach for floating points or integers with wide value range
// copy and sort approach for floating points, decimals, or integers with wide
// value range
// O(n) space, O(nlogn) time
template <typename T>
struct SortModer {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
using Allocator = arrow::stl::allocator<CType>;

template <typename Type = T>
static enable_if_floating_point<Type, CType> GetNan() {
return static_cast<CType>(NAN);
}

template <typename Type = T>
static enable_if_t<!is_floating_type<Type>::value, CType> GetNan() {
DCHECK(false);
return static_cast<CType>(0);
}

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
Expand Down Expand Up @@ -246,7 +258,7 @@ struct SortModer {
if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
// handle NAN at last
if (nan_count > 0) {
auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count);
auto value_count = std::make_pair(GetNan(), nan_count);
nan_count = 0;
return value_count;
}
Expand Down Expand Up @@ -318,13 +330,18 @@ struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
};

template <typename InType>
struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
struct Moder<InType, enable_if_floating_point<InType>> {
SortModer<InType> impl;
};

template <typename InType>
struct Moder<InType, enable_if_decimal<InType>> {
SortModer<InType> impl;
};

template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
Expand Down Expand Up @@ -366,30 +383,33 @@ struct ModeExecutor {
}
};

VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type) {
Result<ValueDescr> ModeType(KernelContext*, const std::vector<ValueDescr>& descrs) {
return ValueDescr::Array(
struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())}));
}

VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type,
ArrayKernelExec exec) {
VectorKernel kernel;
kernel.init = ModeState::Init;
kernel.can_execute_chunkwise = false;
kernel.output_chunked = false;
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature =
KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type));
return kernel;
}

void AddBooleanModeKernel(VectorFunction* func) {
VectorKernel kernel = NewModeKernel(boolean());
kernel.exec = ModeExecutor<StructType, BooleanType>::Exec;
DCHECK_OK(func->AddKernel(kernel));
}

void AddNumericModeKernels(VectorFunction* func) {
for (const auto& type : NumericTypes()) {
VectorKernel kernel = NewModeKernel(type);
kernel.exec = GenerateNumeric<ModeExecutor, StructType>(*type);
DCHECK_OK(func->AddKernel(kernel));
switch (in_type->id()) {
case Type::DECIMAL128:
case Type::DECIMAL256:
kernel.signature =
KernelSignature::Make({InputType(in_type->id())}, OutputType(ModeType));
break;
default: {
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature = KernelSignature::Make({InputType(in_type->id())},
ValueDescr::Array(std::move(out_type)));
break;
}
}
kernel.exec = std::move(exec);
return kernel;
}

const FunctionDoc mode_doc{
Expand All @@ -409,8 +429,17 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) {
static auto default_options = ModeOptions::Defaults();
auto func = std::make_shared<VectorFunction>("mode", Arity::Unary(), &mode_doc,
&default_options);
AddBooleanModeKernel(func.get());
AddNumericModeKernels(func.get());
DCHECK_OK(func->AddKernel(
NewModeKernel(boolean(), ModeExecutor<StructType, BooleanType>::Exec)));
for (const auto& type : NumericTypes()) {
DCHECK_OK(func->AddKernel(
NewModeKernel(type, GenerateNumeric<ModeExecutor, StructType>(*type))));
}
// Type parameters are ignored
DCHECK_OK(func->AddKernel(
NewModeKernel(decimal128(1, 0), ModeExecutor<StructType, Decimal128Type>::Exec)));
DCHECK_OK(func->AddKernel(
NewModeKernel(decimal256(1, 0), ModeExecutor<StructType, Decimal256Type>::Exec)));
DCHECK_OK(registry->AddFunction(std::move(func)));
}

Expand Down
50 changes: 39 additions & 11 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q,
return datapoint_index;
}

template <typename T>
double DataPointToDouble(T value, const DataType&) {
return static_cast<double>(value);
}
double DataPointToDouble(const Decimal128& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
double DataPointToDouble(const Decimal256& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}

// copy and nth_element approach, large memory footprint
template <typename InType>
struct SortQuantiler {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;
using Allocator = arrow::stl::allocator<CType>;

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Expand Down Expand Up @@ -106,8 +117,7 @@ struct SortQuantiler {
// prepare out array
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
const std::shared_ptr<DataType> out_type =
is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
const std::shared_ptr<DataType> out_type = is_datapoint ? datum.type() : float64();
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
Expand Down Expand Up @@ -142,8 +152,9 @@ struct SortQuantiler {
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
const int64_t q_index = q_indices[i];
out_buffer[q_index] = GetQuantileByInterp(
in_buffer, &last_index, options.q[q_index], options.interpolation);
out_buffer[q_index] =
GetQuantileByInterp(in_buffer, &last_index, options.q[q_index],
options.interpolation, *datum.type());
}
}
}
Expand All @@ -170,8 +181,8 @@ struct SortQuantiler {

// return quantile interpolated from adjacent input data points
double GetQuantileByInterp(std::vector<CType, Allocator>& in, uint64_t* last_index,
double q,
enum QuantileOptions::Interpolation interpolation) {
double q, enum QuantileOptions::Interpolation interpolation,
const DataType& in_type) {
const double index = (in.size() - 1) * q;
const uint64_t lower_index = static_cast<uint64_t>(index);
const double fraction = index - lower_index;
Expand All @@ -181,7 +192,7 @@ struct SortQuantiler {
std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
}

const double lower_value = static_cast<double>(in[lower_index]);
const double lower_value = DataPointToDouble(in[lower_index], in_type);
if (fraction == 0) {
*last_index = lower_index;
return lower_value;
Expand All @@ -197,7 +208,7 @@ struct SortQuantiler {
}
*last_index = lower_index;

const double higher_value = static_cast<double>(in[higher_index]);
const double higher_value = DataPointToDouble(in[higher_index], in_type);

if (interpolation == QuantileOptions::LINEAR) {
// more stable than naive linear interpolation
Expand Down Expand Up @@ -399,10 +410,15 @@ struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename InType>
struct ExactQuantiler<InType, enable_if_t<is_decimal_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename T>
Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
ArrayData* output = out->mutable_array();
output->length = options.q.size();
auto out_type = IsDataPoint(options) ? scalar.type : float64();
Expand Down Expand Up @@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
} else {
double* out_buffer = output->template GetMutableValues<double>(1);
for (int64_t i = 0; i < output->length; i++) {
out_buffer[i] = static_cast<double>(UnboxScalar<T>::Unbox(scalar));
out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar), *scalar.type);
}
}
return Status::OK();
Expand Down Expand Up @@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) {
base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal128Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal256Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
}

const FunctionDoc quantile_doc{
Expand Down
Loading

0 comments on commit 6f4c991

Please sign in to comment.