Skip to content

Commit

Permalink
Fix core-dump when running with Python dev mode
Browse files Browse the repository at this point in the history
  • Loading branch information
icexelloss committed Jun 6, 2023
1 parent dc1d734 commit 1203346
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions python/pyarrow/src/arrow/python/udf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
std::vector<std::shared_ptr<DataType>> input_types,
std::shared_ptr<DataType> output_type)
: agg_cb(agg_cb), agg_function(agg_function), output_type(output_type) {
Py_INCREF(agg_function->obj());
std::vector<std::shared_ptr<Field>> fields;
for (size_t i = 0; i < input_types.size(); i++) {
fields.push_back(std::move(field("", input_types[i])));
Expand Down Expand Up @@ -183,8 +184,8 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
return Status::Invalid("Finalized is called with empty inputs");
}

std::unique_ptr<OwnedRef> result;
RETURN_NOT_OK(SafeCallIntoPython([&] {
std::unique_ptr<OwnedRef> result;
OwnedRef arg_tuple(PyTuple_New(num_args));
RETURN_NOT_OK(CheckPyError());

Expand All @@ -197,21 +198,21 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
result = std::make_unique<OwnedRef>(
agg_cb(function->obj(), udf_context, arg_tuple.obj()));
RETURN_NOT_OK(CheckPyError());
return Status::OK();
}));
// unwrapping the output for expected output type
if (is_scalar(result->obj())) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val, unwrap_scalar(result->obj()));
if (*output_type != *val->type) {
return Status::TypeError("Expected output datatype ", output_type->ToString(),
", but function returned datatype ",
val->type->ToString());
// unwrapping the output for expected output type
if (is_scalar(result->obj())) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> val, unwrap_scalar(result->obj()));
if (*output_type != *val->type) {
return Status::TypeError("Expected output datatype ", output_type->ToString(),
", but function returned datatype ",
val->type->ToString());
}
out->value = std::move(val);
return Status::OK();
}
out->value = std::move(val);
return Status::OK();
}
return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name,
" (expected Scalar)");
return Status::TypeError("Unexpected output type: ", Py_TYPE(result->obj())->tp_name,
" (expected Scalar)");
}));
return Status::OK();
}

UdfWrapperCallback agg_cb;
Expand Down Expand Up @@ -374,6 +375,9 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_
registry = compute::GetFunctionRegistry();
}

// Py_INCREF here so that once a function is registered
// its refcount gets increased by 1 and doesn't get gced
// if all existing refs are gone
Py_INCREF(agg_function);

static auto default_scalar_aggregate_options =
Expand All @@ -392,8 +396,6 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_
compute::KernelContext* ctx,
const compute::KernelInitArgs& args)
-> Result<std::unique_ptr<compute::KernelState>> {
// Py_INCREF because OwnedRefNoGIL will call Py_XDECREF in destructor
Py_INCREF(agg_function);
return std::make_unique<PythonUdfScalarAggregatorImpl>(
agg_wrapper, std::make_shared<OwnedRefNoGIL>(agg_function), options.input_types,
options.output_type);
Expand Down

0 comments on commit 1203346

Please sign in to comment.