From 1203346cb592aa01b22d1e4ca5351f26e6410fbf Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 6 Jun 2023 11:39:34 -0400 Subject: [PATCH] Fix core-dump when running with Python dev mode --- python/pyarrow/src/arrow/python/udf.cc | 36 ++++++++++++++------------ 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/src/arrow/python/udf.cc b/python/pyarrow/src/arrow/python/udf.cc index a856f0e398b07..8cc5fb659aba9 100644 --- a/python/pyarrow/src/arrow/python/udf.cc +++ b/python/pyarrow/src/arrow/python/udf.cc @@ -131,6 +131,7 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator { std::vector> input_types, std::shared_ptr output_type) : agg_cb(agg_cb), agg_function(agg_function), output_type(output_type) { + Py_INCREF(agg_function->obj()); std::vector> fields; for (size_t i = 0; i < input_types.size(); i++) { fields.push_back(std::move(field("", input_types[i]))); @@ -183,8 +184,8 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator { return Status::Invalid("Finalized is called with empty inputs"); } - std::unique_ptr result; RETURN_NOT_OK(SafeCallIntoPython([&] { + std::unique_ptr result; OwnedRef arg_tuple(PyTuple_New(num_args)); RETURN_NOT_OK(CheckPyError()); @@ -197,21 +198,21 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator { result = std::make_unique( agg_cb(function->obj(), udf_context, arg_tuple.obj())); RETURN_NOT_OK(CheckPyError()); - return Status::OK(); - })); - // unwrapping the output for expected output type - if (is_scalar(result->obj())) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr val, unwrap_scalar(result->obj())); - if (*output_type != *val->type) { - return Status::TypeError("Expected output datatype ", output_type->ToString(), - ", but function returned datatype ", - val->type->ToString()); + // unwrapping the output for expected output type + if (is_scalar(result->obj())) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr val, unwrap_scalar(result->obj())); + if (*output_type != *val->type) { + return Status::TypeError("Expected output datatype ", output_type->ToString(), + ", but function returned datatype ", + val->type->ToString()); + } + out->value = std::move(val); + return Status::OK(); } - out->value = std::move(val); - return Status::OK(); - } - return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name, - " (expected Scalar)"); + return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name, + " (expected Scalar)"); + })); + return Status::OK(); } UdfWrapperCallback agg_cb; @@ -374,6 +375,9 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_ registry = compute::GetFunctionRegistry(); } + // Py_INCREF here so that once a function is registered + // its refcount gets increased by 1 and doesn't get gced + // if all existing refs are gone Py_INCREF(agg_function); static auto default_scalar_aggregate_options = @@ -392,8 +396,6 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_ compute::KernelContext* ctx, const compute::KernelInitArgs& args) -> Result> { - // Py_INCREF because OwnedRefNoGIL will call Py_XDECREF in destructor - Py_INCREF(agg_function); return std::make_unique( agg_wrapper, std::make_shared(agg_function), options.input_types, options.output_type);