Skip to content

Commit

Permalink
GH-36931: [C++] Add cumulative_mean function (#36932)
Browse files Browse the repository at this point in the history
### Rationale for this change

Add `cumulative_mean` function

### What changes are included in this PR?

Implement `cumulative_mean` function. The current cumulative_* kernel generator can only be based on a simple binary arithmetic op and the state can only be a single value. I refactored it to using of a generic state such that it can handle complex operations such as `mean`, `median`, `var` etc.

### Are these changes tested?

Yes

### Are there any user-facing changes?
No

* Closes: #36931

Lead-authored-by: Jin Shang <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
js8544 and pitrou authored Aug 9, 2023
1 parent 9f183fc commit 6e6e6f0
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 77 deletions.
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,10 @@ Result<Datum> CumulativeMin(const Datum& values, const CumulativeOptions& option
return CallFunction("cumulative_min", {Datum(values)}, &options, ctx);
}

Result<Datum> CumulativeMean(const Datum& values, const CumulativeOptions& options,
ExecContext* ctx) {
return CallFunction("cumulative_mean", {Datum(values)}, &options, ctx);
}

} // namespace compute
} // namespace arrow
11 changes: 11 additions & 0 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class ARROW_EXPORT CumulativeOptions : public FunctionOptions {
/// - prod: 1
/// - min: maximum of the input type
/// - max: minimum of the input type
/// - mean: start is ignored because it has no meaning for mean
std::optional<std::shared_ptr<Scalar>> start;

/// If true, nulls in the input are ignored and produce a corresponding null output.
Expand Down Expand Up @@ -661,6 +662,16 @@ Result<Datum> CumulativeMin(
const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Compute the cumulative mean of an array-like object
///
/// \param[in] values array-like input
/// \param[in] options configures cumulative mean behavior, `start` is ignored
/// \param[in] ctx the function execution context, optional
ARROW_EXPORT
Result<Datum> CumulativeMean(
const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(),
ExecContext* ctx = NULLPTR);

/// \brief Return the first order difference of an array.
///
/// Computes the first order difference of an array, i.e.
Expand Down
195 changes: 140 additions & 55 deletions cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/common_internal.h"
#include "arrow/result.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/visit_type_inline.h"

namespace arrow {
namespace compute {
namespace internal {
namespace arrow::compute::internal {

namespace {

Expand Down Expand Up @@ -63,19 +62,60 @@ struct CumulativeOptionsWrapper : public OptionsWrapper<OptionsType> {
}
};

// The driver kernel for all cumulative compute functions. Op is a compute kernel
// representing any binary associative operation with an identity element (add, product,
// min, max, etc.), i.e. ones that form a monoid, and OptionsType the options type
// corresponding to Op. ArgType and OutType are the input and output types, which will
// The cumulative value is computed based on a simple arithmetic binary op
// such as Add, Mul, Min, Max, etc.
template <typename Op, typename ArgType>
struct CumulativeBinaryOp {
using OutType = ArgType;
using OutValue = typename GetOutputType<OutType>::T;
using ArgValue = typename GetViewType<ArgType>::T;

OutValue current_value;

CumulativeBinaryOp() { current_value = Identity<Op>::template value<OutValue>; }

explicit CumulativeBinaryOp(const std::shared_ptr<Scalar> start) {
current_value = UnboxScalar<OutType>::Unbox(*start);
}

OutValue Call(KernelContext* ctx, ArgValue arg, Status* st) {
current_value =
Op::template Call<OutValue, ArgValue, ArgValue>(ctx, arg, current_value, st);
return current_value;
}
};

template <typename ArgType>
struct CumulativeMean {
using OutType = DoubleType;
using ArgValue = typename GetViewType<ArgType>::T;
int64_t count = 0;
double sum = 0;

CumulativeMean() = default;

// start value is ignored for CumulativeMean
explicit CumulativeMean(const std::shared_ptr<Scalar> start) {}

double Call(KernelContext* ctx, ArgValue arg, Status* st) {
sum += static_cast<double>(arg);
++count;
return sum / count;
}
};

// The driver kernel for all cumulative compute functions.
// ArgType and OutType are the input and output types, which will
// normally be the same (e.g. the cumulative sum of an array of Int64Type will result in
// an array of Int64Type).
template <typename OutType, typename ArgType, typename Op, typename OptionsType>
// an array of Int64Type) with the exception of CumulativeMean, which will always return
// a double.
template <typename ArgType, typename CumulativeState>
struct Accumulator {
using OutValue = typename GetOutputType<OutType>::T;
using OutType = typename CumulativeState::OutType;
using ArgValue = typename GetViewType<ArgType>::T;

KernelContext* ctx;
ArgValue current_value;
CumulativeState current_state;
bool skip_nulls;
bool encountered_null = false;
NumericBuilder<OutType> builder;
Expand All @@ -88,21 +128,15 @@ struct Accumulator {
if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null)) {
VisitArrayValuesInline<ArgType>(
input,
[&](ArgValue v) {
current_value = Op::template Call<OutValue, ArgValue, ArgValue>(
ctx, v, current_value, &st);
builder.UnsafeAppend(current_value);
},
[&](ArgValue v) { builder.UnsafeAppend(current_state.Call(ctx, v, &st)); },
[&]() { builder.UnsafeAppendNull(); });
} else {
int64_t nulls_start_idx = 0;
VisitArrayValuesInline<ArgType>(
input,
[&](ArgValue v) {
if (!encountered_null) {
current_value = Op::template Call<OutValue, ArgValue, ArgValue>(
ctx, v, current_value, &st);
builder.UnsafeAppend(current_value);
builder.UnsafeAppend(current_state.Call(ctx, v, &st));
++nulls_start_idx;
}
},
Expand All @@ -115,16 +149,17 @@ struct Accumulator {
}
};

template <typename OutType, typename ArgType, typename Op, typename OptionsType>
template <typename ArgType, typename CumulativeState, typename OptionsType>
struct CumulativeKernel {
using OutType = typename CumulativeState::OutType;
using OutValue = typename GetOutputType<OutType>::T;
static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) {
const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
Accumulator<OutType, ArgType, Op, OptionsType> accumulator(ctx);
Accumulator<ArgType, CumulativeState> accumulator(ctx);
if (options.start.has_value()) {
accumulator.current_value = UnboxScalar<OutType>::Unbox(*(options.start.value()));
accumulator.current_state = CumulativeState(options.start.value());
} else {
accumulator.current_value = Identity<Op>::template value<OutValue>;
accumulator.current_state = CumulativeState();
}
accumulator.skip_nulls = options.skip_nulls;

Expand All @@ -138,16 +173,17 @@ struct CumulativeKernel {
}
};

template <typename OutType, typename ArgType, typename Op, typename OptionsType>
template <typename ArgType, typename CumulativeState, typename OptionsType>
struct CumulativeKernelChunked {
using OutType = typename CumulativeState::OutType;
using OutValue = typename GetOutputType<OutType>::T;
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const auto& options = CumulativeOptionsWrapper<OptionsType>::Get(ctx);
Accumulator<OutType, ArgType, Op, OptionsType> accumulator(ctx);
Accumulator<ArgType, CumulativeState> accumulator(ctx);
if (options.start.has_value()) {
accumulator.current_value = UnboxScalar<OutType>::Unbox(*(options.start.value()));
accumulator.current_state = CumulativeState(options.start.value());
} else {
accumulator.current_value = Identity<Op>::template value<OutValue>;
accumulator.current_state = CumulativeState();
}
accumulator.skip_nulls = options.skip_nulls;

Expand Down Expand Up @@ -217,53 +253,102 @@ const FunctionDoc cumulative_min_doc{
"start as the new minimum)."),
{"values"},
"CumulativeOptions"};
} // namespace

template <typename Op, typename OptionsType>
void MakeVectorCumulativeFunction(FunctionRegistry* registry, const std::string func_name,
const FunctionDoc doc) {
const FunctionDoc cumulative_mean_doc{
"Compute the cumulative mean over a numeric input",
("`values` must be numeric. Return an array/chunked array which is the\n"
"cumulative mean computed over `values`. CumulativeOptions::start_value is \n"
"ignored."),
{"values"},
"CumulativeOptions"};

// Kernel factory for complex stateful computations.
template <template <typename ArgType> typename State, typename OptionsType>
struct CumulativeStatefulKernelFactory {
VectorKernel kernel;

CumulativeStatefulKernelFactory() {
kernel.can_execute_chunkwise = false;
kernel.null_handling = NullHandling::type::COMPUTED_NO_PREALLOCATE;
kernel.mem_allocation = MemAllocation::type::NO_PREALLOCATE;
kernel.init = CumulativeOptionsWrapper<OptionsType>::Init;
}

template <typename Type>
enable_if_number<Type, Status> Visit(const Type& type) {
kernel.signature = KernelSignature::Make(
{type.GetSharedPtr()},
OutputType(TypeTraits<typename State<Type>::OutType>::type_singleton()));
kernel.exec = CumulativeKernel<Type, State<Type>, OptionsType>::Exec;
kernel.exec_chunked = CumulativeKernelChunked<Type, State<Type>, OptionsType>::Exec;
return arrow::Status::OK();
}

Status Visit(const DataType& type) {
return Status::NotImplemented("Cumulative kernel not implemented for type ",
type.ToString());
}

Result<VectorKernel> Make(const DataType& type) {
RETURN_NOT_OK(VisitTypeInline(type, this));
return kernel;
}
};

template <template <typename ArgType> typename State, typename OptionsType>
void MakeVectorCumulativeStatefulFunction(FunctionRegistry* registry,
const std::string func_name,
const FunctionDoc doc) {
static const OptionsType kDefaultOptions = OptionsType::Defaults();
auto func =
std::make_shared<VectorFunction>(func_name, Arity::Unary(), doc, &kDefaultOptions);

std::vector<std::shared_ptr<DataType>> types;
types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());

CumulativeStatefulKernelFactory<State, OptionsType> kernel_factory;
for (const auto& ty : types) {
VectorKernel kernel;
kernel.can_execute_chunkwise = false;
kernel.null_handling = NullHandling::type::COMPUTED_NO_PREALLOCATE;
kernel.mem_allocation = MemAllocation::type::NO_PREALLOCATE;
kernel.signature = KernelSignature::Make({ty}, OutputType(ty));
kernel.exec =
ArithmeticExecFromOp<CumulativeKernel, Op, ArrayKernelExec, OptionsType>(ty);
kernel.exec_chunked =
ArithmeticExecFromOp<CumulativeKernelChunked, Op, VectorKernel::ChunkedExec,
OptionsType>(ty);
kernel.init = CumulativeOptionsWrapper<OptionsType>::Init;
auto kernel = kernel_factory.Make(*ty).ValueOrDie();
DCHECK_OK(func->AddKernel(std::move(kernel)));
}

DCHECK_OK(registry->AddFunction(std::move(func)));
}

// A kernel factory that forwards to CumulativeBinaryOp<Op, ...> for the given type.
// Need to use a struct because template-using declarations cannot appear in
// function scope.
template <typename Op, typename OptionsType>
struct MakeVectorCumulativeBinaryOpFunction {
template <typename ArgType>
using State = CumulativeBinaryOp<Op, ArgType>;

static void Call(FunctionRegistry* registry, std::string func_name, FunctionDoc doc) {
MakeVectorCumulativeStatefulFunction<State, OptionsType>(
registry, std::move(func_name), std::move(doc));
}
};

} // namespace

void RegisterVectorCumulativeSum(FunctionRegistry* registry) {
MakeVectorCumulativeFunction<Add, CumulativeOptions>(registry, "cumulative_sum",
cumulative_sum_doc);
MakeVectorCumulativeFunction<AddChecked, CumulativeOptions>(
MakeVectorCumulativeBinaryOpFunction<Add, CumulativeOptions>::Call(
registry, "cumulative_sum", cumulative_sum_doc);
MakeVectorCumulativeBinaryOpFunction<AddChecked, CumulativeOptions>::Call(
registry, "cumulative_sum_checked", cumulative_sum_checked_doc);

MakeVectorCumulativeFunction<Multiply, CumulativeOptions>(registry, "cumulative_prod",
cumulative_prod_doc);
MakeVectorCumulativeFunction<MultiplyChecked, CumulativeOptions>(
MakeVectorCumulativeBinaryOpFunction<Multiply, CumulativeOptions>::Call(
registry, "cumulative_prod", cumulative_prod_doc);
MakeVectorCumulativeBinaryOpFunction<MultiplyChecked, CumulativeOptions>::Call(
registry, "cumulative_prod_checked", cumulative_prod_checked_doc);

MakeVectorCumulativeFunction<Min, CumulativeOptions>(registry, "cumulative_min",
cumulative_min_doc);
MakeVectorCumulativeFunction<Max, CumulativeOptions>(registry, "cumulative_max",
cumulative_max_doc);
MakeVectorCumulativeBinaryOpFunction<Min, CumulativeOptions>::Call(
registry, "cumulative_min", cumulative_min_doc);
MakeVectorCumulativeBinaryOpFunction<Max, CumulativeOptions>::Call(
registry, "cumulative_max", cumulative_max_doc);

MakeVectorCumulativeStatefulFunction<CumulativeMean, CumulativeOptions>(
registry, "cumulative_mean", cumulative_max_doc);
}

} // namespace internal
} // namespace compute
} // namespace arrow
} // namespace arrow::compute::internal
Loading

0 comments on commit 6e6e6f0

Please sign in to comment.