From dc1d7340187dd3beb9e630a866fb5f7ea853cd8e Mon Sep 17 00:00:00 2001 From: Li Jin Date: Mon, 5 Jun 2023 16:25:13 -0400 Subject: [PATCH] Fix lint (clang-format) --- python/pyarrow/src/arrow/python/udf.cc | 220 ++++++++++++------------- python/pyarrow/src/arrow/python/udf.h | 12 +- 2 files changed, 115 insertions(+), 117 deletions(-) diff --git a/python/pyarrow/src/arrow/python/udf.cc b/python/pyarrow/src/arrow/python/udf.cc index 32f020587f6d5..a856f0e398b07 100644 --- a/python/pyarrow/src/arrow/python/udf.cc +++ b/python/pyarrow/src/arrow/python/udf.cc @@ -17,16 +17,16 @@ #include -#include "arrow/python/udf.h" -#include "arrow/table.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/function.h" #include "arrow/compute/kernel.h" #include "arrow/python/common.h" +#include "arrow/python/udf.h" +#include "arrow/table.h" #include "arrow/util/checked_cast.h" namespace arrow { - using internal::checked_cast; +using internal::checked_cast; namespace py { namespace { @@ -75,7 +75,8 @@ struct ScalarUdfAggregator : public compute::KernelState { virtual Status Finalize(compute::KernelContext* ctx, Datum* out) = 0; }; -arrow::Status AggregateUdfConsume(compute::KernelContext* ctx, const compute::ExecSpan& batch) { +arrow::Status AggregateUdfConsume(compute::KernelContext* ctx, + const compute::ExecSpan& batch) { return checked_cast(ctx->state())->Consume(ctx, batch); } @@ -124,106 +125,101 @@ struct PythonTableUdfKernelInit { UdfWrapperCallback cb; }; - struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator { - - PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb, - std::shared_ptr agg_function, - std::vector> input_types, - std::shared_ptr output_type): - agg_cb(agg_cb), - agg_function(agg_function), - output_type(output_type) { - std::vector> fields; - for (size_t i = 0; i < input_types.size(); i++) { - fields.push_back(std::move(field("", input_types[i]))); - } - input_schema = schema(std::move(fields)); - }; - - ~PythonUdfScalarAggregatorImpl() { - if (_Py_IsFinalizing()) { - agg_function->detach(); - } +struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator { + PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb, + std::shared_ptr agg_function, + std::vector> input_types, + std::shared_ptr output_type) + : agg_cb(agg_cb), agg_function(agg_function), output_type(output_type) { + std::vector> fields; + for (size_t i = 0; i < input_types.size(); i++) { + fields.push_back(std::move(field("", input_types[i]))); } + input_schema = schema(std::move(fields)); + }; - Status Consume(compute::KernelContext* ctx, const compute::ExecSpan& batch) { - ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool())); - values.push_back(std::move(rb)); - return Status::OK(); + ~PythonUdfScalarAggregatorImpl() { + if (_Py_IsFinalizing()) { + agg_function->detach(); } + } - Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) { - auto& other_values = checked_cast(src).values; - values.insert(values.end(), - std::make_move_iterator(other_values.begin()), - std::make_move_iterator(other_values.end())); + Status Consume(compute::KernelContext* ctx, const compute::ExecSpan& batch) { + ARROW_ASSIGN_OR_RAISE( + auto rb, batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool())); + values.push_back(std::move(rb)); + return Status::OK(); + } - other_values.erase(other_values.begin(), other_values.end()); - return Status::OK(); + Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) { + auto& other_values = checked_cast(src).values; + values.insert(values.end(), std::make_move_iterator(other_values.begin()), + std::make_move_iterator(other_values.end())); + + other_values.erase(other_values.begin(), other_values.end()); + return Status::OK(); + } + + Status Finalize(compute::KernelContext* ctx, Datum* out) { + auto state = + arrow::internal::checked_cast(ctx->state()); + std::shared_ptr& function = state->agg_function; + const int num_args = input_schema->num_fields(); + + // Note: The way that batches are concatenated together + // would result in using double amount of the memory. + // This is OK for now because non decomposable aggregate + // UDF is supposed to be used with segmented aggregation + // where the size of the segment is more or less constant + // so doubling that is not a big deal. This can be also + // improved in the future to use more efficient way to + // concatenate. + ARROW_ASSIGN_OR_RAISE(auto table, + arrow::Table::FromRecordBatches(input_schema, values)); + ARROW_ASSIGN_OR_RAISE(table, table->CombineChunks(ctx->memory_pool())); + UdfContext udf_context{ctx->memory_pool(), table->num_rows()}; + + if (table->num_rows() == 0) { + return Status::Invalid("Finalized is called with empty inputs"); } - Status Finalize(compute::KernelContext* ctx, Datum* out) { - auto state = arrow::internal::checked_cast(ctx->state()); - std::shared_ptr& function = state->agg_function; - const int num_args = input_schema->num_fields(); - - // Note: The way that batches are concatenated together - // would result in using double amount of the memory. - // This is OK for now because non decomposable aggregate - // UDF is supposed to be used with segmented aggregation - // where the size of the segment is more or less constant - // so doubling that is not a big deal. This can be also - // improved in the future to use more efficient way to - // concatenate. - ARROW_ASSIGN_OR_RAISE( - auto table, - arrow::Table::FromRecordBatches(input_schema, values) - ); - ARROW_ASSIGN_OR_RAISE( - table, table->CombineChunks(ctx->memory_pool()) - ); - UdfContext udf_context{ctx->memory_pool(), table->num_rows()}; - - if (table->num_rows() == 0) { - return Status::Invalid("Finalized is called with empty inputs"); - } + std::unique_ptr result; + RETURN_NOT_OK(SafeCallIntoPython([&] { + OwnedRef arg_tuple(PyTuple_New(num_args)); + RETURN_NOT_OK(CheckPyError()); - std::unique_ptr result; - RETURN_NOT_OK(SafeCallIntoPython([&] { - OwnedRef arg_tuple(PyTuple_New(num_args)); - RETURN_NOT_OK(CheckPyError()); - - for (int arg_id = 0; arg_id < num_args; arg_id++) { - // Since we combined chunks there is only one chunk - std::shared_ptr c_data = table->column(arg_id)->chunk(0); - PyObject* data = wrap_array(c_data); - PyTuple_SetItem(arg_tuple.obj(), arg_id, data); - } - result = std::make_unique(agg_cb(function->obj(), udf_context, arg_tuple.obj())); - RETURN_NOT_OK(CheckPyError()); - return Status::OK(); - })); - // unwrapping the output for expected output type - if (is_scalar(result->obj())) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr val, unwrap_scalar(result->obj())); - if (*output_type != *val->type) { - return Status::TypeError("Expected output datatype ", output_type->ToString(), - ", but function returned datatype ", - val->type->ToString()); - } - out->value = std::move(val); - return Status::OK(); + for (int arg_id = 0; arg_id < num_args; arg_id++) { + // Since we combined chunks there is only one chunk + std::shared_ptr c_data = table->column(arg_id)->chunk(0); + PyObject* data = wrap_array(c_data); + PyTuple_SetItem(arg_tuple.obj(), arg_id, data); + } + result = std::make_unique( + agg_cb(function->obj(), udf_context, arg_tuple.obj())); + RETURN_NOT_OK(CheckPyError()); + return Status::OK(); + })); + // unwrapping the output for expected output type + if (is_scalar(result->obj())) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr val, unwrap_scalar(result->obj())); + if (*output_type != *val->type) { + return Status::TypeError("Expected output datatype ", output_type->ToString(), + ", but function returned datatype ", + val->type->ToString()); } - return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name, - " (expected Scalar)"); + out->value = std::move(val); + return Status::OK(); } + return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name, + " (expected Scalar)"); + } - UdfWrapperCallback agg_cb; - std::vector> values; - std::shared_ptr agg_function; - std::shared_ptr input_schema; - std::shared_ptr output_type; - }; + UdfWrapperCallback agg_cb; + std::vector> values; + std::shared_ptr agg_function; + std::shared_ptr input_schema; + std::shared_ptr output_type; +}; struct PythonUdf : public PythonUdfKernelState { PythonUdf(std::shared_ptr function, UdfWrapperCallback cb, @@ -358,17 +354,18 @@ Status RegisterTabularFunction(PyObject* user_function, UdfWrapperCallback wrapp wrapper, options, registry); } -Status AddAggKernel(std::shared_ptr sig, compute::KernelInit init, - compute::ScalarAggregateFunction* func) { - - compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init), AggregateUdfConsume, AggregateUdfMerge, AggregateUdfFinalize, /*ordered=*/false); +Status AddAggKernel(std::shared_ptr sig, + compute::KernelInit init, compute::ScalarAggregateFunction* func) { + compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init), + AggregateUdfConsume, AggregateUdfMerge, + AggregateUdfFinalize, /*ordered=*/false); RETURN_NOT_OK(func->AddKernel(std::move(kernel))); return Status::OK(); } Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_wrapper, - const UdfOptions& options, - compute::FunctionRegistry* registry) { + const UdfOptions& options, + compute::FunctionRegistry* registry) { if (!PyCallable_Check(agg_function)) { return Status::TypeError("Expected a callable Python object."); } @@ -379,9 +376,11 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_ Py_INCREF(agg_function); - static auto default_scalar_aggregate_options = compute::ScalarAggregateOptions::Defaults(); + static auto default_scalar_aggregate_options = + compute::ScalarAggregateOptions::Defaults(); auto aggregate_func = std::make_shared( - options.func_name, options.arity, options.func_doc, &default_scalar_aggregate_options); + options.func_name, options.arity, options.func_doc, + &default_scalar_aggregate_options); std::vector input_types; for (const auto& in_dtype : options.input_types) { @@ -390,21 +389,20 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_ compute::OutputType output_type(options.output_type); compute::KernelInit init = [agg_wrapper, agg_function, options]( - compute::KernelContext* ctx, - const compute::KernelInitArgs& args) -> Result> { + compute::KernelContext* ctx, + const compute::KernelInitArgs& args) + -> Result> { // Py_INCREF because OwnedRefNoGIL will call Py_XDECREF in destructor Py_INCREF(agg_function); return std::make_unique( - agg_wrapper, - std::make_shared(agg_function), - options.input_types, - options.output_type); + agg_wrapper, std::make_shared(agg_function), options.input_types, + options.output_type); }; - RETURN_NOT_OK( - AddAggKernel(compute::KernelSignature::Make( - std::move(input_types), std::move(output_type), options.arity.is_varargs), - init, aggregate_func.get())); + RETURN_NOT_OK(AddAggKernel( + compute::KernelSignature::Make(std::move(input_types), std::move(output_type), + options.arity.is_varargs), + init, aggregate_func.get())); RETURN_NOT_OK(registry->AddFunction(std::move(aggregate_func))); return Status::OK(); diff --git a/python/pyarrow/src/arrow/python/udf.h b/python/pyarrow/src/arrow/python/udf.h index cc2f3ab62f57c..682cbb2ffe8d5 100644 --- a/python/pyarrow/src/arrow/python/udf.h +++ b/python/pyarrow/src/arrow/python/udf.h @@ -54,18 +54,18 @@ using UdfWrapperCallback = std::function> ARROW_PYTHON_EXPORT CallTabularFunction(const std::string& func_name, const std::vector& args,