Skip to content

Commit

Permalink
Fix lint (clang-format)
Browse files Browse the repository at this point in the history
  • Loading branch information
icexelloss committed Jun 5, 2023
1 parent 9d7fd9d commit dc1d734
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 117 deletions.
220 changes: 109 additions & 111 deletions python/pyarrow/src/arrow/python/udf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

#include <iostream>

#include "arrow/python/udf.h"
#include "arrow/table.h"
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/python/common.h"
#include "arrow/python/udf.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"

namespace arrow {
using internal::checked_cast;
using internal::checked_cast;
namespace py {
namespace {

Expand Down Expand Up @@ -75,7 +75,8 @@ struct ScalarUdfAggregator : public compute::KernelState {
virtual Status Finalize(compute::KernelContext* ctx, Datum* out) = 0;
};

arrow::Status AggregateUdfConsume(compute::KernelContext* ctx, const compute::ExecSpan& batch) {
arrow::Status AggregateUdfConsume(compute::KernelContext* ctx,
const compute::ExecSpan& batch) {
return checked_cast<ScalarUdfAggregator*>(ctx->state())->Consume(ctx, batch);
}

Expand Down Expand Up @@ -124,106 +125,101 @@ struct PythonTableUdfKernelInit {
UdfWrapperCallback cb;
};

struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {

PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
std::shared_ptr<OwnedRefNoGIL> agg_function,
std::vector<std::shared_ptr<DataType>> input_types,
std::shared_ptr<DataType> output_type):
agg_cb(agg_cb),
agg_function(agg_function),
output_type(output_type) {
std::vector<std::shared_ptr<Field>> fields;
for (size_t i = 0; i < input_types.size(); i++) {
fields.push_back(std::move(field("", input_types[i])));
}
input_schema = schema(std::move(fields));
};

~PythonUdfScalarAggregatorImpl() {
if (_Py_IsFinalizing()) {
agg_function->detach();
}
struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
PythonUdfScalarAggregatorImpl(UdfWrapperCallback agg_cb,
std::shared_ptr<OwnedRefNoGIL> agg_function,
std::vector<std::shared_ptr<DataType>> input_types,
std::shared_ptr<DataType> output_type)
: agg_cb(agg_cb), agg_function(agg_function), output_type(output_type) {
std::vector<std::shared_ptr<Field>> fields;
for (size_t i = 0; i < input_types.size(); i++) {
fields.push_back(std::move(field("", input_types[i])));
}
input_schema = schema(std::move(fields));
};

Status Consume(compute::KernelContext* ctx, const compute::ExecSpan& batch) {
ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
values.push_back(std::move(rb));
return Status::OK();
~PythonUdfScalarAggregatorImpl() {
if (_Py_IsFinalizing()) {
agg_function->detach();
}
}

Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
auto& other_values = checked_cast<PythonUdfScalarAggregatorImpl&>(src).values;
values.insert(values.end(),
std::make_move_iterator(other_values.begin()),
std::make_move_iterator(other_values.end()));
Status Consume(compute::KernelContext* ctx, const compute::ExecSpan& batch) {
ARROW_ASSIGN_OR_RAISE(
auto rb, batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
values.push_back(std::move(rb));
return Status::OK();
}

other_values.erase(other_values.begin(), other_values.end());
return Status::OK();
Status MergeFrom(compute::KernelContext* ctx, compute::KernelState&& src) {
auto& other_values = checked_cast<PythonUdfScalarAggregatorImpl&>(src).values;
values.insert(values.end(), std::make_move_iterator(other_values.begin()),
std::make_move_iterator(other_values.end()));

other_values.erase(other_values.begin(), other_values.end());
return Status::OK();
}

Status Finalize(compute::KernelContext* ctx, Datum* out) {
auto state =
arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
const int num_args = input_schema->num_fields();

// Note: The way that batches are concatenated together
// would result in using double amount of the memory.
// This is OK for now because non decomposable aggregate
// UDF is supposed to be used with segmented aggregation
// where the size of the segment is more or less constant
// so doubling that is not a big deal. This can be also
// improved in the future to use more efficient way to
// concatenate.
ARROW_ASSIGN_OR_RAISE(auto table,
arrow::Table::FromRecordBatches(input_schema, values));
ARROW_ASSIGN_OR_RAISE(table, table->CombineChunks(ctx->memory_pool()));
UdfContext udf_context{ctx->memory_pool(), table->num_rows()};

if (table->num_rows() == 0) {
return Status::Invalid("Finalized is called with empty inputs");
}

Status Finalize(compute::KernelContext* ctx, Datum* out) {
auto state = arrow::internal::checked_cast<PythonUdfScalarAggregatorImpl*>(ctx->state());
std::shared_ptr<OwnedRefNoGIL>& function = state->agg_function;
const int num_args = input_schema->num_fields();

// Note: The way that batches are concatenated together
// would result in using double amount of the memory.
// This is OK for now because non decomposable aggregate
// UDF is supposed to be used with segmented aggregation
// where the size of the segment is more or less constant
// so doubling that is not a big deal. This can be also
// improved in the future to use more efficient way to
// concatenate.
ARROW_ASSIGN_OR_RAISE(
auto table,
arrow::Table::FromRecordBatches(input_schema, values)
);
ARROW_ASSIGN_OR_RAISE(
table, table->CombineChunks(ctx->memory_pool())
);
UdfContext udf_context{ctx->memory_pool(), table->num_rows()};

if (table->num_rows() == 0) {
return Status::Invalid("Finalized is called with empty inputs");
}
std::unique_ptr<OwnedRef> result;
RETURN_NOT_OK(SafeCallIntoPython([&] {
OwnedRef arg_tuple(PyTuple_New(num_args));
RETURN_NOT_OK(CheckPyError());

std::unique_ptr<OwnedRef> result;
RETURN_NOT_OK(SafeCallIntoPython([&] {
OwnedRef arg_tuple(PyTuple_New(num_args));
RETURN_NOT_OK(CheckPyError());

for (int arg_id = 0; arg_id < num_args; arg_id++) {
// Since we combined chunks there is only one chunk
std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
PyObject* data = wrap_array(c_data);
PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
}
result = std::make_unique<OwnedRef>(agg_cb(function->obj(), udf_context, arg_tuple.obj()));
RETURN_NOT_OK(CheckPyError());
return Status::OK();
}));
// unwrapping the output for expected output type
if (is_scalar(result->obj())) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val, unwrap_scalar(result->obj()));
if (*output_type != *val->type) {
return Status::TypeError("Expected output datatype ", output_type->ToString(),
", but function returned datatype ",
val->type->ToString());
}
out->value = std::move(val);
return Status::OK();
for (int arg_id = 0; arg_id < num_args; arg_id++) {
// Since we combined chunks there is only one chunk
std::shared_ptr<Array> c_data = table->column(arg_id)->chunk(0);
PyObject* data = wrap_array(c_data);
PyTuple_SetItem(arg_tuple.obj(), arg_id, data);
}
result = std::make_unique<OwnedRef>(
agg_cb(function->obj(), udf_context, arg_tuple.obj()));
RETURN_NOT_OK(CheckPyError());
return Status::OK();
}));
// unwrapping the output for expected output type
if (is_scalar(result->obj())) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val, unwrap_scalar(result->obj()));
if (*output_type != *val->type) {
return Status::TypeError("Expected output datatype ", output_type->ToString(),
", but function returned datatype ",
val->type->ToString());
}
return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name,
" (expected Scalar)");
out->value = std::move(val);
return Status::OK();
}
return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name,
" (expected Scalar)");
}

UdfWrapperCallback agg_cb;
std::vector<std::shared_ptr<RecordBatch>> values;
std::shared_ptr<OwnedRefNoGIL> agg_function;
std::shared_ptr<Schema> input_schema;
std::shared_ptr<DataType> output_type;
};
UdfWrapperCallback agg_cb;
std::vector<std::shared_ptr<RecordBatch>> values;
std::shared_ptr<OwnedRefNoGIL> agg_function;
std::shared_ptr<Schema> input_schema;
std::shared_ptr<DataType> output_type;
};

struct PythonUdf : public PythonUdfKernelState {
PythonUdf(std::shared_ptr<OwnedRefNoGIL> function, UdfWrapperCallback cb,
Expand Down Expand Up @@ -358,17 +354,18 @@ Status RegisterTabularFunction(PyObject* user_function, UdfWrapperCallback wrapp
wrapper, options, registry);
}

Status AddAggKernel(std::shared_ptr<compute::KernelSignature> sig, compute::KernelInit init,
compute::ScalarAggregateFunction* func) {

compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init), AggregateUdfConsume, AggregateUdfMerge, AggregateUdfFinalize, /*ordered=*/false);
Status AddAggKernel(std::shared_ptr<compute::KernelSignature> sig,
compute::KernelInit init, compute::ScalarAggregateFunction* func) {
compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init),
AggregateUdfConsume, AggregateUdfMerge,
AggregateUdfFinalize, /*ordered=*/false);
RETURN_NOT_OK(func->AddKernel(std::move(kernel)));
return Status::OK();
}

Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_wrapper,
const UdfOptions& options,
compute::FunctionRegistry* registry) {
const UdfOptions& options,
compute::FunctionRegistry* registry) {
if (!PyCallable_Check(agg_function)) {
return Status::TypeError("Expected a callable Python object.");
}
Expand All @@ -379,9 +376,11 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_

Py_INCREF(agg_function);

static auto default_scalar_aggregate_options = compute::ScalarAggregateOptions::Defaults();
static auto default_scalar_aggregate_options =
compute::ScalarAggregateOptions::Defaults();
auto aggregate_func = std::make_shared<compute::ScalarAggregateFunction>(
options.func_name, options.arity, options.func_doc, &default_scalar_aggregate_options);
options.func_name, options.arity, options.func_doc,
&default_scalar_aggregate_options);

std::vector<compute::InputType> input_types;
for (const auto& in_dtype : options.input_types) {
Expand All @@ -390,21 +389,20 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_
compute::OutputType output_type(options.output_type);

compute::KernelInit init = [agg_wrapper, agg_function, options](
compute::KernelContext* ctx,
const compute::KernelInitArgs& args) -> Result<std::unique_ptr<compute::KernelState>> {
compute::KernelContext* ctx,
const compute::KernelInitArgs& args)
-> Result<std::unique_ptr<compute::KernelState>> {
// Py_INCREF because OwnedRefNoGIL will call Py_XDECREF in destructor
Py_INCREF(agg_function);
return std::make_unique<PythonUdfScalarAggregatorImpl>(
agg_wrapper,
std::make_shared<OwnedRefNoGIL>(agg_function),
options.input_types,
options.output_type);
agg_wrapper, std::make_shared<OwnedRefNoGIL>(agg_function), options.input_types,
options.output_type);
};

RETURN_NOT_OK(
AddAggKernel(compute::KernelSignature::Make(
std::move(input_types), std::move(output_type), options.arity.is_varargs),
init, aggregate_func.get()));
RETURN_NOT_OK(AddAggKernel(
compute::KernelSignature::Make(std::move(input_types), std::move(output_type),
options.arity.is_varargs),
init, aggregate_func.get()));

RETURN_NOT_OK(registry->AddFunction(std::move(aggregate_func)));
return Status::OK();
Expand Down
12 changes: 6 additions & 6 deletions python/pyarrow/src/arrow/python/udf.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ using UdfWrapperCallback = std::function<PyObject*(

/// \brief register a Scalar user-defined-function from Python
Status ARROW_PYTHON_EXPORT RegisterScalarFunction(
PyObject* user_function, UdfWrapperCallback wrapper,
const UdfOptions& options, compute::FunctionRegistry* registry = NULLPTR);
PyObject* user_function, UdfWrapperCallback wrapper, const UdfOptions& options,
compute::FunctionRegistry* registry = NULLPTR);

/// \brief register a Table user-defined-function from Python
Status ARROW_PYTHON_EXPORT RegisterTabularFunction(
PyObject* user_function, UdfWrapperCallback wrapper,
const UdfOptions& options, compute::FunctionRegistry* registry = NULLPTR);
PyObject* user_function, UdfWrapperCallback wrapper, const UdfOptions& options,
compute::FunctionRegistry* registry = NULLPTR);

/// \brief register a Aggregate user-defined-function from Python
Status ARROW_PYTHON_EXPORT RegisterAggregateFunction(
PyObject* user_function, UdfWrapperCallback wrapper,
const UdfOptions& options, compute::FunctionRegistry* registry = NULLPTR);
PyObject* user_function, UdfWrapperCallback wrapper, const UdfOptions& options,
compute::FunctionRegistry* registry = NULLPTR);

Result<std::shared_ptr<RecordBatchReader>> ARROW_PYTHON_EXPORT
CallTabularFunction(const std::string& func_name, const std::vector<Datum>& args,
Expand Down

0 comments on commit dc1d734

Please sign in to comment.