diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index f73b10e11edd7..d47ee42ebf239 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -417,5 +417,10 @@ Result CumulativeMin(const Datum& values, const CumulativeOptions& option return CallFunction("cumulative_min", {Datum(values)}, &options, ctx); } +Result CumulativeMean(const Datum& values, const CumulativeOptions& options, + ExecContext* ctx) { + return CallFunction("cumulative_mean", {Datum(values)}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 4f226ac00788a..0233090ef6fb9 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -226,6 +226,7 @@ class ARROW_EXPORT CumulativeOptions : public FunctionOptions { /// - prod: 1 /// - min: maximum of the input type /// - max: minimum of the input type + /// - mean: start is ignored because it has no meaning for mean std::optional> start; /// If true, nulls in the input are ignored and produce a corresponding null output. @@ -661,6 +662,16 @@ Result CumulativeMin( const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Compute the cumulative mean of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative mean behavior, `start` is ignored +/// \param[in] ctx the function execution context, optional +ARROW_EXPORT +Result CumulativeMean( + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), + ExecContext* ctx = NULLPTR); + /// \brief Return the first order difference of an array. /// /// Computes the first order difference of an array, i.e. diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc index 82caa3bff59aa..86d2679486726 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc @@ -25,12 +25,11 @@ #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/compute/kernels/common_internal.h" #include "arrow/result.h" +#include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/visit_type_inline.h" -namespace arrow { -namespace compute { -namespace internal { +namespace arrow::compute::internal { namespace { @@ -63,19 +62,60 @@ struct CumulativeOptionsWrapper : public OptionsWrapper { } }; -// The driver kernel for all cumulative compute functions. Op is a compute kernel -// representing any binary associative operation with an identity element (add, product, -// min, max, etc.), i.e. ones that form a monoid, and OptionsType the options type -// corresponding to Op. ArgType and OutType are the input and output types, which will +// The cumulative value is computed based on a simple arithmetic binary op +// such as Add, Mul, Min, Max, etc. +template +struct CumulativeBinaryOp { + using OutType = ArgType; + using OutValue = typename GetOutputType::T; + using ArgValue = typename GetViewType::T; + + OutValue current_value; + + CumulativeBinaryOp() { current_value = Identity::template value; } + + explicit CumulativeBinaryOp(const std::shared_ptr start) { + current_value = UnboxScalar::Unbox(*start); + } + + OutValue Call(KernelContext* ctx, ArgValue arg, Status* st) { + current_value = + Op::template Call(ctx, arg, current_value, st); + return current_value; + } +}; + +template +struct CumulativeMean { + using OutType = DoubleType; + using ArgValue = typename GetViewType::T; + int64_t count = 0; + double sum = 0; + + CumulativeMean() = default; + + // start value is ignored for CumulativeMean + explicit CumulativeMean(const std::shared_ptr start) {} + + double Call(KernelContext* ctx, ArgValue arg, Status* st) { + sum += static_cast(arg); + ++count; + return sum / count; + } +}; + +// The driver kernel for all cumulative compute functions. +// ArgType and OutType are the input and output types, which will // normally be the same (e.g. the cumulative sum of an array of Int64Type will result in -// an array of Int64Type). -template +// an array of Int64Type) with the exception of CumulativeMean, which will always return +// a double. +template struct Accumulator { - using OutValue = typename GetOutputType::T; + using OutType = typename CumulativeState::OutType; using ArgValue = typename GetViewType::T; KernelContext* ctx; - ArgValue current_value; + CumulativeState current_state; bool skip_nulls; bool encountered_null = false; NumericBuilder builder; @@ -88,11 +128,7 @@ struct Accumulator { if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null)) { VisitArrayValuesInline( input, - [&](ArgValue v) { - current_value = Op::template Call( - ctx, v, current_value, &st); - builder.UnsafeAppend(current_value); - }, + [&](ArgValue v) { builder.UnsafeAppend(current_state.Call(ctx, v, &st)); }, [&]() { builder.UnsafeAppendNull(); }); } else { int64_t nulls_start_idx = 0; @@ -100,9 +136,7 @@ struct Accumulator { input, [&](ArgValue v) { if (!encountered_null) { - current_value = Op::template Call( - ctx, v, current_value, &st); - builder.UnsafeAppend(current_value); + builder.UnsafeAppend(current_state.Call(ctx, v, &st)); ++nulls_start_idx; } }, @@ -115,16 +149,17 @@ struct Accumulator { } }; -template +template struct CumulativeKernel { + using OutType = typename CumulativeState::OutType; using OutValue = typename GetOutputType::T; static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { const auto& options = CumulativeOptionsWrapper::Get(ctx); - Accumulator accumulator(ctx); + Accumulator accumulator(ctx); if (options.start.has_value()) { - accumulator.current_value = UnboxScalar::Unbox(*(options.start.value())); + accumulator.current_state = CumulativeState(options.start.value()); } else { - accumulator.current_value = Identity::template value; + accumulator.current_state = CumulativeState(); } accumulator.skip_nulls = options.skip_nulls; @@ -138,16 +173,17 @@ struct CumulativeKernel { } }; -template +template struct CumulativeKernelChunked { + using OutType = typename CumulativeState::OutType; using OutValue = typename GetOutputType::T; static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const auto& options = CumulativeOptionsWrapper::Get(ctx); - Accumulator accumulator(ctx); + Accumulator accumulator(ctx); if (options.start.has_value()) { - accumulator.current_value = UnboxScalar::Unbox(*(options.start.value())); + accumulator.current_state = CumulativeState(options.start.value()); } else { - accumulator.current_value = Identity::template value; + accumulator.current_state = CumulativeState(); } accumulator.skip_nulls = options.skip_nulls; @@ -217,11 +253,52 @@ const FunctionDoc cumulative_min_doc{ "start as the new minimum)."), {"values"}, "CumulativeOptions"}; -} // namespace -template -void MakeVectorCumulativeFunction(FunctionRegistry* registry, const std::string func_name, - const FunctionDoc doc) { +const FunctionDoc cumulative_mean_doc{ + "Compute the cumulative mean over a numeric input", + ("`values` must be numeric. Return an array/chunked array which is the\n" + "cumulative mean computed over `values`. CumulativeOptions::start_value is \n" + "ignored."), + {"values"}, + "CumulativeOptions"}; + +// Kernel factory for complex stateful computations. +template