Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36931: [C++] Add cumulative_mean function #36932

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,10 @@ Result<Datum> CumulativeMin(const Datum& values, const CumulativeOptions& option
return CallFunction("cumulative_min", {Datum(values)}, &options, ctx);
}

Result<Datum> CumulativeMean(const Datum& values, const CumulativeOptions& options,
ExecContext* ctx) {
return CallFunction("cumulative_mean", {Datum(values)}, &options, ctx);
}

} // namespace compute
} // namespace arrow
10 changes: 10 additions & 0 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,16 @@ Result<Datum> CumulativeMin(
const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Compute the cumulative mean of an array-like object
///
pitrou marked this conversation as resolved.
Show resolved Hide resolved
/// \param[in] values array-like input
/// \param[in] options configures cumulative mean behavior
/// \param[in] ctx the function execution context, optional
ARROW_EXPORT
Result<Datum> CumulativeMean(
const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Return the first order difference of an array.
///
/// Computes the first order difference of an array, i.e.
Expand Down
223 changes: 174 additions & 49 deletions cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/result.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/visit_type_inline.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

namespace {

Expand Down Expand Up @@ -63,19 +62,60 @@ struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
}
};

// The driver kernel for all cumulative compute functions. Op is a compute kernel
// representing any binary associative operation with an identity element (add, product,
// min, max, etc.), i.e. ones that form a monoid, and OptionsType the options type
// corresponding to Op. ArgType and OutType are the input and output types, which will
// The cumulative value is computed based on a simple arithmetic binary op
// such as Add, Mul, Min, Max, etc.
template <typename Op, typename ArgType>
struct CumulativeBinaryOp {
using OutType = ArgType;
using OutValue = typename GetOutputType<OutType>::T;
using ArgValue = typename GetViewType<ArgType>::T;

OutValue current_value;

CumulativeBinaryOp() { current_value = Identity<Op>::template value<OutValue>; }

explicit CumulativeBinaryOp(const std::shared_ptr<Scalar> start) {
current_value = UnboxScalar<OutType>::Unbox(*start);
}

OutValue Call(KernelContext* ctx, ArgValue arg, Status* st) {
current_value =
Op::template Call<OutValue, ArgValue, ArgValue>(ctx, arg, current_value, st);
return current_value;
}
};

template <typename ArgType>
struct CumulativeMean {
using OutType = DoubleType;
using ArgValue = typename GetViewType<ArgType>::T;
int64_t count = 0;
double sum = 0;

CumulativeMean() = default;

// start value is ignored for CumulativeMean
explicit CumulativeMean(const std::shared_ptr<Scalar> start) {}

double Call(KernelContext* ctx, ArgValue arg, Status* st) {
sum += static_cast<double>(arg);
++count;
return sum / count;
}
};

// The driver kernel for all cumulative compute functions.
// ArgType and OutType are the input and output types, which will
// normally be the same (e.g. the cumulative sum of an array of Int64Type will result in
// an array of Int64Type).
template <typename OutType, typename ArgType, typename Op, typename OptionsType>
// an array of Int64Type) with the exception of CumulativeMean, which will always return
// a double.
template <typename ArgType, typename CumulativeState>
struct Accumulator {
using OutValue = typename GetOutputType<OutType>::T;
using OutType = typename CumulativeState::OutType;
using ArgValue = typename GetViewType<ArgType>::T;

KernelContext* ctx;
ArgValue current_value;
CumulativeState current_state;
bool skip_nulls;
bool encountered_null = false;
NumericBuilder<OutType> builder;
Expand All @@ -88,21 +128,15 @@ struct Accumulator {
if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null)) {
VisitArrayValuesInline<ArgType>(
input,
[&](ArgValue v) {
current_value = Op::template Call<OutValue, ArgValue, ArgValue>(
ctx, v, current_value, &st);
builder.UnsafeAppend(current_value);
},
[&](ArgValue v) { builder.UnsafeAppend(current_state.Call(ctx, v, &st)); },
[&]() { builder.UnsafeAppendNull(); });
} else {
int64_t nulls_start_idx = 0;
VisitArrayValuesInline<ArgType>(
input,
[&](ArgValue v) {
if (!encountered_null) {
current_value = Op::template Call<OutValue, ArgValue, ArgValue>(
ctx, v, current_value, &st);
builder.UnsafeAppend(current_value);
builder.UnsafeAppend(current_state.Call(ctx, v, &st));
++nulls_start_idx;
}
},
Expand All @@ -115,16 +149,17 @@ struct Accumulator {
}
};

template <typename OutType, typename ArgType, typename Op, typename OptionsType>
template <typename ArgType, typename CumulativeState, typename OptionsType>
struct CumulativeKernel {
using OutType = typename CumulativeState::OutType;
using OutValue = typename GetOutputType<OutType>::T;
static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
Accumulator<OutType, ArgType, Op, OptionsType> accumulator(ctx);
Accumulator<ArgType, CumulativeState> accumulator(ctx);
if (options.start.has_value()) {
accumulator.current_value = UnboxScalar<OutType>::Unbox(*(options.start.value()));
accumulator.current_state = CumulativeState(options.start.value());
} else {
accumulator.current_value = Identity<Op>::template value<OutValue>;
accumulator.current_state = CumulativeState();
}
accumulator.skip_nulls = options.skip_nulls;

Expand All @@ -138,16 +173,17 @@ struct CumulativeKernel {
}
};

template <typename OutType, typename ArgType, typename Op, typename OptionsType>
template <typename ArgType, typename CumulativeState, typename OptionsType>
struct CumulativeKernelChunked {
using OutType = typename CumulativeState::OutType;
using OutValue = typename GetOutputType<OutType>::T;
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
Accumulator<OutType, ArgType, Op, OptionsType> accumulator(ctx);
Accumulator<ArgType, CumulativeState> accumulator(ctx);
if (options.start.has_value()) {
accumulator.current_value = UnboxScalar<OutType>::Unbox(*(options.start.value()));
accumulator.current_state = CumulativeState(options.start.value());
} else {
accumulator.current_value = Identity<Op>::template value<OutValue>;
accumulator.current_state = CumulativeState();
}
accumulator.skip_nulls = options.skip_nulls;

Expand Down Expand Up @@ -217,53 +253,142 @@ const FunctionDoc cumulative_min_doc{
"start as the new minimum)."),
{"values"},
"CumulativeOptions"};

const FunctionDoc cumulative_mean_doc{
"Compute the cumulative mean over a numeric input",
("`values` must be numeric. Return an array/chunked array which is the\n"
"cumulative mean computed over `values`. CumulativeOptions::start_value is \n"
"ignored."),
{"values"},
"CumulativeOptions"};
} // namespace
pitrou marked this conversation as resolved.
Show resolved Hide resolved

// Kernel factory for simple arithmetic operations.
// Op is a compute kernel representing any binary associative operation with
// an identity element (add, product, min, max, etc.), i.e. ones that form a monoid.
template <typename Op, typename OptionsType>
struct CumulativeBinaryOpKernelFactory {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
VectorKernel kernel;

CumulativeBinaryOpKernelFactory() {
kernel.can_execute_chunkwise = false;
kernel.null_handling = NullHandling::type::COMPUTED_NO_PREALLOCATE;
kernel.mem_allocation = MemAllocation::type::NO_PREALLOCATE;
kernel.init = CumulativeOptionsWrapper<OptionsType>::Init;
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type& type) {
kernel.signature =
KernelSignature::Make({type.GetSharedPtr()}, OutputType(type.GetSharedPtr()));
kernel.exec = CumulativeKernel<Type, CumulativeBinaryOp<Op, Type>, OptionsType>::Exec;
kernel.exec_chunked =
CumulativeKernelChunked<Type, CumulativeBinaryOp<Op, Type>, OptionsType>::Exec;
return arrow::Status::OK();
}

Status Visit(const DataType& type) {
return Status::NotImplemented("Cumulative kernel not implemented for type ",
type.ToString());
}

Result<VectorKernel> Make(const DataType& type) {
RETURN_NOT_OK(VisitTypeInline(type, this));
return kernel;
}
};

template <typename Op, typename OptionsType>
void MakeVectorCumulativeFunction(FunctionRegistry* registry, const std::string func_name,
const FunctionDoc doc) {
void MakeVectorCumulativeBinaryOpFunction(FunctionRegistry* registry,
const std::string func_name,
const FunctionDoc doc) {
static const OptionsType kDefaultOptions = OptionsType::Defaults();
auto func =
std::make_shared<VectorFunction>(func_name, Arity::Unary(), doc, &kDefaultOptions);

std::vector<std::shared_ptr<DataType>> types;
types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());

CumulativeBinaryOpKernelFactory<Op, OptionsType> kernel_factory;
for (const auto& ty : types) {
VectorKernel kernel;
auto kernel = kernel_factory.Make(*ty).ValueOrDie();
DCHECK_OK(func->AddKernel(std::move(kernel)));
}

DCHECK_OK(registry->AddFunction(std::move(func)));
}

// Kernel factory for complex stateful computations.
template <template <typename ArgType> typename State, typename OptionsType>
struct CumulativeStatefulKernelFactory {
VectorKernel kernel;

CumulativeStatefulKernelFactory() {
kernel.can_execute_chunkwise = false;
kernel.null_handling = NullHandling::type::COMPUTED_NO_PREALLOCATE;
kernel.mem_allocation = MemAllocation::type::NO_PREALLOCATE;
kernel.signature = KernelSignature::Make({ty}, OutputType(ty));
kernel.exec =
ArithmeticExecFromOp<CumulativeKernel, Op, ArrayKernelExec, OptionsType>(ty);
kernel.exec_chunked =
ArithmeticExecFromOp<CumulativeKernelChunked, Op, VectorKernel::ChunkedExec,
OptionsType>(ty);
kernel.init = CumulativeOptionsWrapper<OptionsType>::Init;
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type& type) {
kernel.signature = KernelSignature::Make(
{type.GetSharedPtr()},
OutputType(TypeTraits<typename State<Type>::OutType>::type_singleton()));
kernel.exec = CumulativeKernel<Type, State<Type>, OptionsType>::Exec;
kernel.exec_chunked = CumulativeKernelChunked<Type, State<Type>, OptionsType>::Exec;
return arrow::Status::OK();
}

Status Visit(const DataType& type) {
return Status::NotImplemented("Cumulative kernel not implemented for type ",
type.ToString());
}

Result<VectorKernel> Make(const DataType& type) {
RETURN_NOT_OK(VisitTypeInline(type, this));
return kernel;
}
};

template <template <typename ArgType> typename State, typename OptionsType>
void MakeVectorCumulativeStatefulFunction(FunctionRegistry* registry,
const std::string func_name,
const FunctionDoc doc) {
static const OptionsType kDefaultOptions = OptionsType::Defaults();
auto func =
std::make_shared<VectorFunction>(func_name, Arity::Unary(), doc, &kDefaultOptions);

std::vector<std::shared_ptr<DataType>> types;
types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());

CumulativeStatefulKernelFactory<State, OptionsType> kernel_factory;
for (const auto& ty : types) {
auto kernel = kernel_factory.Make(*ty).ValueOrDie();
DCHECK_OK(func->AddKernel(std::move(kernel)));
}

DCHECK_OK(registry->AddFunction(std::move(func)));
}

void RegisterVectorCumulativeSum(FunctionRegistry* registry) {
MakeVectorCumulativeFunction<Add, CumulativeOptions>(registry, "cumulative_sum",
cumulative_sum_doc);
MakeVectorCumulativeFunction<AddChecked, CumulativeOptions>(
MakeVectorCumulativeBinaryOpFunction<Add, CumulativeOptions>(registry, "cumulative_sum",
cumulative_sum_doc);
MakeVectorCumulativeBinaryOpFunction<AddChecked, CumulativeOptions>(
registry, "cumulative_sum_checked", cumulative_sum_checked_doc);

MakeVectorCumulativeFunction<Multiply, CumulativeOptions>(registry, "cumulative_prod",
cumulative_prod_doc);
MakeVectorCumulativeFunction<MultiplyChecked, CumulativeOptions>(
MakeVectorCumulativeBinaryOpFunction<Multiply, CumulativeOptions>(
registry, "cumulative_prod", cumulative_prod_doc);
MakeVectorCumulativeBinaryOpFunction<MultiplyChecked, CumulativeOptions>(
registry, "cumulative_prod_checked", cumulative_prod_checked_doc);

MakeVectorCumulativeFunction<Min, CumulativeOptions>(registry, "cumulative_min",
cumulative_min_doc);
MakeVectorCumulativeFunction<Max, CumulativeOptions>(registry, "cumulative_max",
cumulative_max_doc);
MakeVectorCumulativeBinaryOpFunction<Min, CumulativeOptions>(registry, "cumulative_min",
cumulative_min_doc);
MakeVectorCumulativeBinaryOpFunction<Max, CumulativeOptions>(registry, "cumulative_max",
cumulative_max_doc);

MakeVectorCumulativeStatefulFunction<CumulativeMean, CumulativeOptions>(
registry, "cumulative_mean", cumulative_max_doc);
}

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
Loading