From 35ba81ae53066b8b97743053634070b2d5f12569 Mon Sep 17 00:00:00 2001 From: Jin Shang Date: Fri, 28 Jul 2023 00:31:16 +0800 Subject: [PATCH 1/5] GH-36931: [C++] Add cumulative_mean function --- cpp/src/arrow/compute/api_vector.cc | 5 + cpp/src/arrow/compute/api_vector.h | 10 + .../compute/kernels/vector_cumulative_ops.cc | 216 ++++++++++++++---- .../kernels/vector_cumulative_ops_test.cc | 100 +++++++- docs/source/cpp/compute.rst | 34 +-- 5 files changed, 293 insertions(+), 72 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index f73b10e11edd7..d47ee42ebf239 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -417,5 +417,10 @@ Result CumulativeMin(const Datum& values, const CumulativeOptions& option return CallFunction("cumulative_min", {Datum(values)}, &options, ctx); } +Result CumulativeMean(const Datum& values, const CumulativeOptions& options, + ExecContext* ctx) { + return CallFunction("cumulative_mean", {Datum(values)}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 4f226ac00788a..8bf33bb0f273c 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -661,6 +661,16 @@ Result CumulativeMin( const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Compute the cumulative mean of an array-like object +/// +/// \param[in] values array-like input +/// \param[in] options configures cumulative mean behavior +/// \param[in] ctx the function execution context, optional +ARROW_EXPORT +Result CumulativeMean( + const Datum& values, const CumulativeOptions& options = CumulativeOptions::Defaults(), + ExecContext* ctx = NULLPTR); + /// \brief Return the first order difference of an array. /// /// Computes the first order difference of an array, i.e. diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc index 82caa3bff59aa..2ba30ade0fe0d 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc @@ -25,12 +25,11 @@ #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/compute/kernels/common_internal.h" #include "arrow/result.h" +#include "arrow/type_traits.h" #include "arrow/util/bit_util.h" #include "arrow/visit_type_inline.h" -namespace arrow { -namespace compute { -namespace internal { +namespace arrow::compute::internal { namespace { @@ -63,19 +62,62 @@ struct CumulativeOptionsWrapper : public OptionsWrapper { } }; -// The driver kernel for all cumulative compute functions. Op is a compute kernel -// representing any binary associative operation with an identity element (add, product, -// min, max, etc.), i.e. ones that form a monoid, and OptionsType the options type -// corresponding to Op. ArgType and OutType are the input and output types, which will +// The cumulative value is computed based on a simple arithmetic binary op +// such as Add, Mul, Min, Max, etc. +template +struct CumulativeBinaryOp { + using OutType = ArgType; + using OutValue = typename GetOutputType::T; + using ArgValue = typename GetViewType::T; + + OutValue current_value; + + explicit CumulativeBinaryOp() { + current_value = Identity::template value; + } + + explicit CumulativeBinaryOp(const std::shared_ptr start) { + current_value = UnboxScalar::Unbox(*start); + } + + OutValue Call(KernelContext* ctx, ArgValue arg, Status* st) { + current_value = + Op::template Call(ctx, arg, current_value, st); + return current_value; + } +}; + +template +struct CumulativeMean { + using OutType = DoubleType; + using ArgValue = typename GetViewType::T; + int64_t count = 0; + double sum = 0; + + explicit CumulativeMean() = default; + + // start value is ignored for CumulativeMean + explicit CumulativeMean(const std::shared_ptr start){}; + + double Call(KernelContext* ctx, ArgValue arg, Status* st) { + sum += static_cast(arg); + ++count; + return sum / count; + } +}; + +// The driver kernel for all cumulative compute functions. +// ArgType and OutType are the input and output types, which will // normally be the same (e.g. the cumulative sum of an array of Int64Type will result in -// an array of Int64Type). -template +// an array of Int64Type) with the exception of CumulativeMean, which will always return +// a double. +template struct Accumulator { - using OutValue = typename GetOutputType::T; + using OutType = typename CumulativeState::OutType; using ArgValue = typename GetViewType::T; KernelContext* ctx; - ArgValue current_value; + CumulativeState current_state; bool skip_nulls; bool encountered_null = false; NumericBuilder builder; @@ -88,11 +130,7 @@ struct Accumulator { if (skip_nulls || (input.GetNullCount() == 0 && !encountered_null)) { VisitArrayValuesInline( input, - [&](ArgValue v) { - current_value = Op::template Call( - ctx, v, current_value, &st); - builder.UnsafeAppend(current_value); - }, + [&](ArgValue v) { builder.UnsafeAppend(current_state.Call(ctx, v, &st)); }, [&]() { builder.UnsafeAppendNull(); }); } else { int64_t nulls_start_idx = 0; @@ -100,9 +138,7 @@ struct Accumulator { input, [&](ArgValue v) { if (!encountered_null) { - current_value = Op::template Call( - ctx, v, current_value, &st); - builder.UnsafeAppend(current_value); + builder.UnsafeAppend(current_state.Call(ctx, v, &st)); ++nulls_start_idx; } }, @@ -115,16 +151,17 @@ struct Accumulator { } }; -template +template struct CumulativeKernel { + using OutType = typename CumulativeState::OutType; using OutValue = typename GetOutputType::T; static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { const auto& options = CumulativeOptionsWrapper::Get(ctx); - Accumulator accumulator(ctx); + Accumulator accumulator(ctx); if (options.start.has_value()) { - accumulator.current_value = UnboxScalar::Unbox(*(options.start.value())); + accumulator.current_state = CumulativeState(options.start.value()); } else { - accumulator.current_value = Identity::template value; + accumulator.current_state = CumulativeState(); } accumulator.skip_nulls = options.skip_nulls; @@ -138,16 +175,17 @@ struct CumulativeKernel { } }; -template +template struct CumulativeKernelChunked { + using OutType = typename CumulativeState::OutType; using OutValue = typename GetOutputType::T; static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const auto& options = CumulativeOptionsWrapper::Get(ctx); - Accumulator accumulator(ctx); + Accumulator accumulator(ctx); if (options.start.has_value()) { - accumulator.current_value = UnboxScalar::Unbox(*(options.start.value())); + accumulator.current_state = CumulativeState(options.start.value()); } else { - accumulator.current_value = Identity::template value; + accumulator.current_state = CumulativeState(); } accumulator.skip_nulls = options.skip_nulls; @@ -217,11 +255,50 @@ const FunctionDoc cumulative_min_doc{ "start as the new minimum)."), {"values"}, "CumulativeOptions"}; + +const FunctionDoc cumulative_mean_doc{ + "Compute the cumulative mean over a numeric input", + ("`values` must be numeric. Return an array/chunked array which is the\n" + "cumulative mean computed over `values`. CumulativeOptions::start_value is \n" + "ignored."), + {"values"}, + "CumulativeOptions"}; } // namespace +// Kernel generator for simple arithmetic operations. +// Op is a compute kernel representing any binary associative operation with +// an identity element (add, product, min, max, etc.), i.e. ones that form a monoid. +template +struct CumulativeBinaryOpKernelGenerator { + VectorKernel kernel; + + CumulativeBinaryOpKernelGenerator() { + kernel.can_execute_chunkwise = false; + kernel.null_handling = NullHandling::type::COMPUTED_NO_PREALLOCATE; + kernel.mem_allocation = MemAllocation::type::NO_PREALLOCATE; + kernel.init = CumulativeOptionsWrapper::Init; + } + + template + enable_if_number Visit(const Type& type) { + kernel.signature = + KernelSignature::Make({type.GetSharedPtr()}, OutputType(type.GetSharedPtr())); + kernel.exec = CumulativeKernel, OptionsType>::Exec; + kernel.exec_chunked = + CumulativeKernelChunked, OptionsType>::Exec; + return arrow::Status::OK(); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Cumulative kernel not implemented for type ", + type.ToString()); + } +}; + template -void MakeVectorCumulativeFunction(FunctionRegistry* registry, const std::string func_name, - const FunctionDoc doc) { +void MakeVectorCumulativeBinaryOpFunction(FunctionRegistry* registry, + const std::string func_name, + const FunctionDoc doc) { static const OptionsType kDefaultOptions = OptionsType::Defaults(); auto func = std::make_shared(func_name, Arity::Unary(), doc, &kDefaultOptions); @@ -229,41 +306,80 @@ void MakeVectorCumulativeFunction(FunctionRegistry* registry, const std::string std::vector> types; types.insert(types.end(), NumericTypes().begin(), NumericTypes().end()); + CumulativeBinaryOpKernelGenerator kernel_generator; for (const auto& ty : types) { - VectorKernel kernel; + DCHECK_OK(VisitTypeInline(*ty, &kernel_generator)); + DCHECK_OK(func->AddKernel(kernel_generator.kernel)); + } + + DCHECK_OK(registry->AddFunction(std::move(func))); +} + +// Kernel generator for complex stateful computations. +template