Skip to content

Commit

Permalink
ARROW-13130: [C++] Implement arithmetic kernels for decimals
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Oct 18, 2021
1 parent 8a540a1 commit dd1151e
Show file tree
Hide file tree
Showing 16 changed files with 1,237 additions and 392 deletions.
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/kernels/aggregate_basic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,8 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
auto init = [min_max_func](
KernelContext* ctx,
const KernelInitArgs& args) -> Result<std::unique_ptr<KernelState>> {
std::vector<ValueDescr> inputs = args.inputs;
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchBest(&inputs));
KernelInitArgs new_args{kernel, inputs, args.options};
ARROW_ASSIGN_OR_RAISE(auto kernel, min_max_func->DispatchExact(args.inputs));
KernelInitArgs new_args{kernel, args.inputs, args.options};
return kernel->init(ctx, new_args);
};

Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/int128_internal.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down Expand Up @@ -111,6 +112,26 @@ void AddAggKernel(std::shared_ptr<KernelSignature> sig, KernelInit init,
ScalarAggregateFinalize finalize, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);

using arrow::internal::VisitSetBitRunsVoid;

template <typename T, typename Enable = void>
struct GetSumType;

template <typename T>
struct GetSumType<T, enable_if_floating_point<T>> {
using SumType = double;
};

template <typename T>
struct GetSumType<T, enable_if_integer<T>> {
using SumType = arrow::internal::int128_t;
};

template <typename T>
struct GetSumType<T, enable_if_decimal<T>> {
using SumType = typename TypeTraits<T>::CType;
};

// SumArray must be parameterized with the SIMD level since it's called both from
// translation units with and without vectorization. Normally it gets inlined but
// if not, without the parameter, we'll have multiple definitions of the same
Expand Down
60 changes: 48 additions & 12 deletions cpp/src/arrow/compute/kernels/aggregate_mode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ constexpr char kCountFieldName[] = "count";

constexpr uint64_t kCountEOF = ~0ULL;

template <typename InType, typename CType = typename InType::c_type>
template <typename InType, typename CType = typename TypeTraits<InType>::CType>
Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
Datum* out) {
const auto& mode_type = TypeTraits<InType>::type_singleton();
DCHECK_EQ(Type::STRUCT, out->type()->id());
const auto& out_type = checked_cast<const StructType&>(*out->type());
DCHECK_EQ(2, out_type.num_fields());
const auto& mode_type = out_type.field(0)->type();
const auto& count_type = int64();

auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
Expand All @@ -61,18 +64,15 @@ Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
count_buffer = count_data->template GetMutableValues<int64_t>(1);
}

const auto& out_type =
struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
*out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));

*out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0));
return std::make_pair(mode_buffer, count_buffer);
}

// find top-n value:count pairs with minimal heap
// suboptimal for tiny or large n, possibly okay as we're not in hot path
template <typename InType, typename Generator>
Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;

using ValueCountPair = std::pair<CType, uint64_t>;
auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
Expand Down Expand Up @@ -203,13 +203,25 @@ struct CountModer<BooleanType> {
}
};

// copy and sort approach for floating points or integers with wide value range
// copy and sort approach for floating points, decimals, or integers with wide
// value range
// O(n) space, O(nlogn) time
template <typename T>
struct SortModer {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
using Allocator = arrow::stl::allocator<CType>;

template <typename Type = T>
static enable_if_floating_point<Type, CType> GetNan() {
return static_cast<CType>(NAN);
}

template <typename Type = T>
static enable_if_t<!is_floating_type<Type>::value, CType> GetNan() {
DCHECK(false);
return static_cast<CType>(0);
}

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Datum& datum = batch[0];
const int64_t in_length = datum.length() - datum.null_count();
Expand Down Expand Up @@ -246,7 +258,7 @@ struct SortModer {
if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
// handle NAN at last
if (nan_count > 0) {
auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count);
auto value_count = std::make_pair(GetNan(), nan_count);
nan_count = 0;
return value_count;
}
Expand Down Expand Up @@ -318,13 +330,18 @@ struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
};

template <typename InType>
struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
struct Moder<InType, enable_if_floating_point<InType>> {
SortModer<InType> impl;
};

template <typename InType>
struct Moder<InType, enable_if_decimal<InType>> {
SortModer<InType> impl;
};

template <typename T>
Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;

const ModeOptions& options = ModeState::Get(ctx);
if ((!options.skip_nulls && !scalar.is_valid) ||
Expand Down Expand Up @@ -378,6 +395,21 @@ VectorKernel NewModeKernel(const std::shared_ptr<DataType>& in_type) {
return kernel;
}

Result<ValueDescr> ModeType(KernelContext*, const std::vector<ValueDescr>& descrs) {
return ValueDescr::Array(
struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())}));
}

VectorKernel NewModeKernel(Type::type type_id, ArrayKernelExec exec) {
VectorKernel kernel;
kernel.init = ModeState::Init;
kernel.can_execute_chunkwise = false;
kernel.output_chunked = false;
kernel.signature = KernelSignature::Make({InputType(type_id)}, OutputType(ModeType));
kernel.exec = std::move(exec);
return kernel;
}

void AddBooleanModeKernel(VectorFunction* func) {
VectorKernel kernel = NewModeKernel(boolean());
kernel.exec = ModeExecutor<StructType, BooleanType>::Exec;
Expand Down Expand Up @@ -411,6 +443,10 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) {
&default_options);
AddBooleanModeKernel(func.get());
AddNumericModeKernels(func.get());
DCHECK_OK(func->AddKernel(
NewModeKernel(Type::DECIMAL128, ModeExecutor<StructType, Decimal128Type>::Exec)));
DCHECK_OK(func->AddKernel(
NewModeKernel(Type::DECIMAL256, ModeExecutor<StructType, Decimal256Type>::Exec)));
DCHECK_OK(registry->AddFunction(std::move(func)));
}

Expand Down
50 changes: 39 additions & 11 deletions cpp/src/arrow/compute/kernels/aggregate_quantile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,21 @@ uint64_t QuantileToDataPoint(size_t length, double q,
return datapoint_index;
}

template <typename T>
double DataPointToDouble(T value, const DataType&) {
return static_cast<double>(value);
}
double DataPointToDouble(const Decimal128& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}
double DataPointToDouble(const Decimal256& value, const DataType& ty) {
return value.ToDouble(checked_cast<const DecimalType&>(ty).scale());
}

// copy and nth_element approach, large memory footprint
template <typename InType>
struct SortQuantiler {
using CType = typename InType::c_type;
using CType = typename TypeTraits<InType>::CType;
using Allocator = arrow::stl::allocator<CType>;

Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
Expand Down Expand Up @@ -106,8 +117,7 @@ struct SortQuantiler {
// prepare out array
// out type depends on options
const bool is_datapoint = IsDataPoint(options);
const std::shared_ptr<DataType> out_type =
is_datapoint ? TypeTraits<InType>::type_singleton() : float64();
const std::shared_ptr<DataType> out_type = is_datapoint ? datum.type() : float64();
int64_t out_length = options.q.size();
if (in_buffer.empty()) {
return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out);
Expand Down Expand Up @@ -142,8 +152,9 @@ struct SortQuantiler {
double* out_buffer = out_data->template GetMutableValues<double>(1);
for (int64_t i = 0; i < out_length; ++i) {
const int64_t q_index = q_indices[i];
out_buffer[q_index] = GetQuantileByInterp(
in_buffer, &last_index, options.q[q_index], options.interpolation);
out_buffer[q_index] =
GetQuantileByInterp(in_buffer, &last_index, options.q[q_index],
options.interpolation, *datum.type());
}
}
}
Expand All @@ -170,8 +181,8 @@ struct SortQuantiler {

// return quantile interpolated from adjacent input data points
double GetQuantileByInterp(std::vector<CType, Allocator>& in, uint64_t* last_index,
double q,
enum QuantileOptions::Interpolation interpolation) {
double q, enum QuantileOptions::Interpolation interpolation,
const DataType& in_type) {
const double index = (in.size() - 1) * q;
const uint64_t lower_index = static_cast<uint64_t>(index);
const double fraction = index - lower_index;
Expand All @@ -181,7 +192,7 @@ struct SortQuantiler {
std::nth_element(in.begin(), in.begin() + lower_index, in.begin() + *last_index);
}

const double lower_value = static_cast<double>(in[lower_index]);
const double lower_value = DataPointToDouble(in[lower_index], in_type);
if (fraction == 0) {
*last_index = lower_index;
return lower_value;
Expand All @@ -197,7 +208,7 @@ struct SortQuantiler {
}
*last_index = lower_index;

const double higher_value = static_cast<double>(in[higher_index]);
const double higher_value = DataPointToDouble(in[higher_index], in_type);

if (interpolation == QuantileOptions::LINEAR) {
// more stable than naive linear interpolation
Expand Down Expand Up @@ -399,10 +410,15 @@ struct ExactQuantiler<InType, enable_if_t<is_floating_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename InType>
struct ExactQuantiler<InType, enable_if_t<is_decimal_type<InType>::value>> {
SortQuantiler<InType> impl;
};

template <typename T>
Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
const Scalar& scalar, Datum* out) {
using CType = typename T::c_type;
using CType = typename TypeTraits<T>::CType;
ArrayData* output = out->mutable_array();
output->length = options.q.size();
auto out_type = IsDataPoint(options) ? scalar.type : float64();
Expand Down Expand Up @@ -433,7 +449,7 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options,
} else {
double* out_buffer = output->template GetMutableValues<double>(1);
for (int64_t i = 0; i < output->length; i++) {
out_buffer[i] = static_cast<double>(UnboxScalar<T>::Unbox(scalar));
out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar), *scalar.type);
}
}
return Status::OK();
Expand Down Expand Up @@ -486,6 +502,18 @@ void AddQuantileKernels(VectorFunction* func) {
base.exec = GenerateNumeric<QuantileExecutor, NullType>(*ty);
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal128Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
{
base.signature =
KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput));
base.exec = QuantileExecutor<NullType, Decimal256Type>::Exec;
DCHECK_OK(func->AddKernel(base));
}
}

const FunctionDoc quantile_doc{
Expand Down
36 changes: 28 additions & 8 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@ template <typename ArrowType>
struct TDigestImpl : public ScalarAggregator {
using ThisType = TDigestImpl<ArrowType>;
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using CType = typename ArrowType::c_type;
using CType = typename TypeTraits<ArrowType>::CType;

explicit TDigestImpl(const TDigestOptions& options)
TDigestImpl(const TDigestOptions& options, const DataType& in_type)
: options{options},
tdigest{options.delta, options.buffer_size},
count{0},
all_valid{true} {}
decimal_scale{0},
all_valid{true} {
if (is_decimal_type<ArrowType>::value) {
decimal_scale = checked_cast<const DecimalType&>(in_type).scale();
}
}

template <typename T>
double ToDouble(T value) const {
return static_cast<double>(value);
}
double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); }
double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); }

Status Consume(KernelContext*, const ExecBatch& batch) override {
if (!this->all_valid) return Status::OK();
Expand All @@ -57,7 +69,7 @@ struct TDigestImpl : public ScalarAggregator {
VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
[&](int64_t pos, int64_t len) {
for (int64_t i = 0; i < len; ++i) {
this->tdigest.NanAdd(values[pos + i]);
this->tdigest.NanAdd(ToDouble(values[pos + i]));
}
});
}
Expand All @@ -66,7 +78,7 @@ struct TDigestImpl : public ScalarAggregator {
if (batch[0].scalar()->is_valid) {
this->count += 1;
for (int64_t i = 0; i < batch.length; i++) {
this->tdigest.NanAdd(value);
this->tdigest.NanAdd(ToDouble(value));
}
}
}
Expand Down Expand Up @@ -110,6 +122,7 @@ struct TDigestImpl : public ScalarAggregator {
const TDigestOptions options;
TDigest tdigest;
int64_t count;
int32_t decimal_scale;
bool all_valid;
};

Expand All @@ -132,8 +145,14 @@ struct TDigestInitState {
}

template <typename Type>
enable_if_t<is_number_type<Type>::value, Status> Visit(const Type&) {
state.reset(new TDigestImpl<Type>(options));
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new TDigestImpl<Type>(options, in_type));
return Status::OK();
}

template <typename Type>
enable_if_decimal<Type, Status> Visit(const Type&) {
state.reset(new TDigestImpl<Type>(options, in_type));
return Status::OK();
}

Expand All @@ -154,7 +173,7 @@ void AddTDigestKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func) {
for (const auto& ty : types) {
auto sig = KernelSignature::Make({InputType(ty)}, float64());
auto sig = KernelSignature::Make({InputType(ty->id())}, float64());
AddAggKernel(std::move(sig), init, func);
}
}
Expand All @@ -179,6 +198,7 @@ std::shared_ptr<ScalarAggregateFunction> AddTDigestAggKernels() {
auto func = std::make_shared<ScalarAggregateFunction>(
"tdigest", Arity::Unary(), &tdigest_doc, &default_tdigest_options);
AddTDigestKernels(TDigestInit, NumericTypes(), func.get());
AddTDigestKernels(TDigestInit, {decimal128(1, 1), decimal256(1, 1)}, func.get());
return func;
}

Expand Down
Loading

0 comments on commit dd1151e

Please sign in to comment.