Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-13130: [C++] Add decimal support to arithmetic kernels #11313

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
auto init = [min_max_func](
KernelContext* ctx,
const KernelInitArgs& args) -> Result<std::unique_ptr<KernelState>> {
std::vector<ValueDescr> inputs = args.inputs;
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs));
KernelInitArgs new_args{kernel, inputs, args.options};
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs));
KernelInitArgs new_args{kernel, args.inputs, args.options};
return kernel->init(ctx, new_args);
};

Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/int128_internal.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr<KernelSignature> sig, KernelInit init,
ScalarAggregateFinalize finalize, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);

using arrow::internal::VisitSetBitRunsVoid;

template <typename T, typename Enable = void>
struct GetSumType;

template <typename T>
struct GetSumType<T, enable_if_floating_point<T>> {
using SumType = double;
};

template <typename T>
struct GetSumType<T, enable_if_integer<T>> {
using SumType = arrow::internal::int128_t;
};

template <typename T>
struct GetSumType<T, enable_if_decimal<T>> {
using SumType = typename TypeTraits<T>::CType;
};

// SumArray must be parameterized with the SIMD level since it's called both from
// translation units with and without vectorization. Normally it gets inlined but
// if not, without the parameter, we'll have multiple definitions of the same
Expand Down
95 changes: 62 additions & 33 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count";

constexpr uint64_t kCountEOF = ~0ULL;

template <typename InType, typename CType = typename InType::c_type>
template <typename InType, typename CType = typename TypeTraits<InType>::CType>
Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
Datum* out) {
const auto& mode_type = TypeTraits<InType>::type_singleton();
DCHECK_EQ(Type::STRUCT, out->type()->id());
const auto& out_type = checked_cast<const StructType&>(*out->type());
DCHECK_EQ(2, out_type.num_fields());
const auto& mode_type = out_type.field(0)->type();
const auto& count_type = int64();

auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
Expand All @@ -61,18 +64,15 @@ Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
count_buffer = count_data->template GetMutableValues<int64_t>(1);
}

const auto& out_type =
struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
*out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));

*out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0));
return std::make_pair(mode_buffer, count_buffer);
}

// find top-n value:count pairs with minimal heap
// suboptimal for tiny or large n, possibly okay as we're not in hot path
template <typename InType, typename Generator>
Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;

using ValueCountPair = std::pair<CType, uint64_t>;
auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
Expand Down Expand Up @@ -203,13 +203,25 @@ struct CountModer<BooleanType> {
}
};

// copy and sort approach for floating points or integers with wide value range
// copy and sort approach for floating points, decimals, or integers with wide
// value range
// O(n) space, O(nlogn) time
template <typename T>
struct SortModer {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
using Allocator = arrow::stl::allocator<CType>;

template <typename Type = T>
static enable_if_floating_point<Type, CType> GetNan() {
return static_cast<CType>(NAN);
}

template <typename Type = T>
static enable_if_t<!is_floating_type<Type>::value, CType> GetNan() {
DCHECK(false);
return static_cast<CType>(0);
}

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
Expand Down Expand Up @@ -246,7 +258,7 @@ struct SortModer {
if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
// handle NAN at last
if (nan_count > 0) {
auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count);
auto value_count = std::make_pair(GetNan(), nan_count);
nan_count = 0;
return value_count;
}
Expand Down Expand Up @@ -318,13 +330,18 @@ struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
};

template <typename InType>
struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
struct Moder<InType, enable_if_floating_point<InType>> {
SortModer<InType> impl;
};

template <typename InType>
struct Moder<InType, enable_if_decimal<InType>> {
SortModer<InType> impl;
};

template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
Expand Down Expand Up @@ -366,30 +383,33 @@ struct ModeExecutor {
}
};

VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type) {
Result<ValueDescr> ModeType(KernelContext*, const std::vector<ValueDescr>& descrs) {
return ValueDescr::Array(
struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())}));
}

VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type,
ArrayKernelExec exec) {
VectorKernel kernel;
kernel.init = ModeState::Init;
kernel.can_execute_chunkwise = false;
kernel.output_chunked = false;
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature =
KernelSignature::Make({InputType(in_type)}, ValueDescr::Array(out_type));
return kernel;
}

void AddBooleanModeKernel(VectorFunction* func) {
VectorKernel kernel = NewModeKernel(boolean());
kernel.exec = ModeExecutor<StructType, BooleanType>::Exec;
DCHECK_OK(func->AddKernel(kernel));
}

void AddNumericModeKernels(VectorFunction* func) {
for (const auto& type : NumericTypes()) {
VectorKernel kernel = NewModeKernel(type);
kernel.exec = GenerateNumeric<ModeExecutor, StructType>(*type);
DCHECK_OK(func->AddKernel(kernel));
switch (in_type->id()) {
case Type::DECIMAL128:
case Type::DECIMAL256:
kernel.signature =
KernelSignature::Make({InputType(in_type->id())}, OutputType(ModeType));
break;
default: {
auto out_type =
struct_({field(kModeFieldName, in_type), field(kCountFieldName, int64())});
kernel.signature = KernelSignature::Make({InputType(in_type->id())},
ValueDescr::Array(std::move(out_type)));
break;
}
}
kernel.exec = std::move(exec);
return kernel;
}

const FunctionDoc mode_doc{
Expand All @@ -409,8 +429,17 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) {
static auto default_options = ModeOptions::Defaults();
auto func = std::make_shared<VectorFunction>("mode", Arity::Unary(), &mode_doc,
&default_options);
AddBooleanModeKernel(func.get());
AddNumericModeKernels(func.get());
DCHECK_OK(func->AddKernel(
NewModeKernel(boolean(), ModeExecutor<StructType, BooleanType>::Exec)));
for (const auto& type : NumericTypes()) {
DCHECK_OK(func->AddKernel(
NewModeKernel(type, GenerateNumeric<ModeExecutor, StructType>(*type))));
}
// Type parameters are ignored
DCHECK_OK(func->AddKernel(
NewModeKernel(decimal128(1, 0), ModeExecutor<StructType, Decimal128Type>::Exec)));
DCHECK_OK(func->AddKernel(
NewModeKernel(decimal256(1, 0), ModeExecutor<StructType, Decimal256Type>::Exec)));
DCHECK_OK(registry->AddFunction(std::move(func)));
}

Expand Down
50 changes: 39 additions & 11 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q,
return datapoint_index;
}

template <typename T>
double DataPointToDouble(T value, const DataType&) {
return static_cast<double>(value);
}
double DataPointToDouble(const Decimal128& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
double DataPointToDouble(const Decimal256& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}

// copy and nth_element approach, large memory footprint
template <typename InType>
struct SortQuantiler {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;
using Allocator = arrow::stl::allocator<CType>;

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Expand Down Expand Up @@ -106,8 +117,7 @@ struct SortQuantiler {
// prepare out array
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
const std::shared_ptr<DataType> out_type =
is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
const std::shared_ptr<DataType> out_type = is_datapoint ? datum.type() : float64();
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
Expand Down Expand Up @@ -142,8 +152,9 @@ struct SortQuantiler {
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
const int64_t q_index = q_indices[i];
out_buffer[q_index] = GetQuantileByInterp(
in_buffer, &last_index, options.q[q_index], options.interpolation);
out_buffer[q_index] =
GetQuantileByInterp(in_buffer, &last_index, options.q[q_index],
options.interpolation, *datum.type());
}
}
}
Expand All @@ -170,8 +181,8 @@ struct SortQuantiler {

// return quantile interpolated from adjacent input data points
double GetQuantileByInterp(std::vector<CType, Allocator>& in, uint64_t* last_index,
double q,
enum QuantileOptions::Interpolation interpolation) {
double q, enum QuantileOptions::Interpolation interpolation,
const DataType& in_type) {
const double index = (in.size() - 1) * q;
const uint64_t lower_index = static_cast<uint64_t>(index);
const double fraction = index - lower_index;
Expand All @@ -181,7 +192,7 @@ struct SortQuantiler {
std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
}

const double lower_value = static_cast<double>(in[lower_index]);
const double lower_value = DataPointToDouble(in[lower_index], in_type);
if (fraction == 0) {
*last_index = lower_index;
return lower_value;
Expand All @@ -197,7 +208,7 @@ struct SortQuantiler {
}
*last_index = lower_index;

const double higher_value = static_cast<double>(in[higher_index]);
const double higher_value = DataPointToDouble(in[higher_index], in_type);

if (interpolation == QuantileOptions::LINEAR) {
// more stable than naive linear interpolation
Expand Down Expand Up @@ -399,10 +410,15 @@ struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename InType>
struct ExactQuantiler<InType, enable_if_t<is_decimal_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename T>
Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
ArrayData* output = out->mutable_array();
output->length = options.q.size();
auto out_type = IsDataPoint(options) ? scalar.type : float64();
Expand Down Expand Up @@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
} else {
double* out_buffer = output->template GetMutableValues<double>(1);
for (int64_t i = 0; i < output->length; i++) {
out_buffer[i] = static_cast<double>(UnboxScalar<T>::Unbox(scalar));
out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar), *scalar.type);
}
}
return Status::OK();
Expand Down Expand Up @@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) {
base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal128Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal256Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
}

const FunctionDoc quantile_doc{
Expand Down
Loading