From ee286fe7a20c832d309a08f064a779f39591144a Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 3 Jul 2019 12:14:41 +0200 Subject: [PATCH] ARROW-4036: [C++] Pluggable Status message, by exposing an abstract delegate class. This provides less "pluggability" but I think still offers a clean model for extension (subsystems can wrap the constructor for there purposes, and provide external static methods to check for particular types of errors). Author: Micah Kornfield Author: Antoine Pitrou Closes #4484 from emkornfield/status_code_proposal and squashes the following commits: 4d1ab8d1d don't import plasma errors directly into top level pyarrow module a66f999f8 make format 040216d48 fixes for comments outside python 729bba1ff Fix Py2 issues (hopefully) ea56d1e6a Fix PythonErrorDetail to store Python error state (and restore it in check_status()) 21e1b95ac fix compilation 9c905b094 fix lint 74d563cd7 fixes 85786efb1 change messages 3626a9016 try removing message a4e6a1ff2 add logging for debug 4586fd1e2 fix typo 8f011b329 fix status propagation 317ea9c66 fix complie 9f5916070 don't make_shared inline 484b3a232 style fix 14e3467b5 dont rely on rtti cd22df64d format dec458506 not-quite pluggable error codes --- c_glib/arrow-glib/error.cpp | 13 +- c_glib/arrow-glib/error.h | 12 +- .../test/plasma/test-plasma-created-object.rb | 2 +- cpp/src/arrow/compute/kernels/cast.cc | 2 +- cpp/src/arrow/csv/column-builder.cc | 2 +- cpp/src/arrow/python/common.cc | 180 +++++++++++++----- cpp/src/arrow/python/common.h | 25 ++- cpp/src/arrow/python/python-test.cc | 106 ++++++++--- cpp/src/arrow/python/serialize.cc | 2 +- cpp/src/arrow/status-test.cc | 29 +++ cpp/src/arrow/status.cc | 32 ++-- cpp/src/arrow/status.h | 78 +++----- cpp/src/plasma/client.cc | 9 +- cpp/src/plasma/common.cc | 81 ++++++++ cpp/src/plasma/common.h | 17 ++ cpp/src/plasma/protocol.cc | 9 +- cpp/src/plasma/test/client_tests.cc | 4 +- cpp/src/plasma/test/serialization_tests.cc | 4 +- python/pyarrow/__init__.py | 3 +- python/pyarrow/_plasma.pyx | 94 ++++++--- python/pyarrow/error.pxi | 29 +-- python/pyarrow/includes/common.pxd | 5 +- python/pyarrow/includes/libarrow.pxd | 7 + python/pyarrow/includes/libplasma.pxd | 25 +++ python/pyarrow/plasma.py | 4 +- python/pyarrow/tests/test_array.py | 2 +- python/pyarrow/tests/test_convert_builtin.py | 73 +++++-- python/pyarrow/tests/test_plasma.py | 4 +- 28 files changed, 586 insertions(+), 267 deletions(-) create mode 100644 python/pyarrow/includes/libplasma.pxd diff --git a/c_glib/arrow-glib/error.cpp b/c_glib/arrow-glib/error.cpp index a56b6ec3d1382..4c1461543f8b3 100644 --- a/c_glib/arrow-glib/error.cpp +++ b/c_glib/arrow-glib/error.cpp @@ -65,22 +65,15 @@ garrow_error_code(const arrow::Status &status) return GARROW_ERROR_NOT_IMPLEMENTED; case arrow::StatusCode::SerializationError: return GARROW_ERROR_SERIALIZATION; - case arrow::StatusCode::PythonError: - return GARROW_ERROR_PYTHON; - case arrow::StatusCode::PlasmaObjectExists: - return GARROW_ERROR_PLASMA_OBJECT_EXISTS; - case arrow::StatusCode::PlasmaObjectNonexistent: - return GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT; - case arrow::StatusCode::PlasmaStoreFull: - return GARROW_ERROR_PLASMA_STORE_FULL; - case arrow::StatusCode::PlasmaObjectAlreadySealed: - return GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED; case arrow::StatusCode::CodeGenError: return GARROW_ERROR_CODE_GENERATION; case arrow::StatusCode::ExpressionValidationError: return GARROW_ERROR_EXPRESSION_VALIDATION; case arrow::StatusCode::ExecutionError: return GARROW_ERROR_EXECUTION; + case arrow::StatusCode::AlreadyExists: + return GARROW_ERROR_ALREADY_EXISTS; + default: return GARROW_ERROR_UNKNOWN; } diff --git a/c_glib/arrow-glib/error.h b/c_glib/arrow-glib/error.h index 3dea9fc2e105b..2fac5ad0d3e77 100644 --- a/c_glib/arrow-glib/error.h +++ b/c_glib/arrow-glib/error.h @@ -35,15 +35,11 @@ G_BEGIN_DECLS * @GARROW_ERROR_UNKNOWN: Unknown error. * @GARROW_ERROR_NOT_IMPLEMENTED: The feature is not implemented. * @GARROW_ERROR_SERIALIZATION: Serialization error. - * @GARROW_ERROR_PYTHON: Python error. - * @GARROW_ERROR_PLASMA_OBJECT_EXISTS: Object already exists on Plasma. - * @GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT: Object doesn't exist on Plasma. - * @GARROW_ERROR_PLASMA_STORE_FULL: Store full error on Plasma. - * @GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED: Object already sealed on Plasma. * @GARROW_ERROR_CODE_GENERATION: Error generating code for expression evaluation * in Gandiva. * @GARROW_ERROR_EXPRESSION_VALIDATION: Validation errors in expression given for code generation. * @GARROW_ERROR_EXECUTION: Execution error while evaluating the expression against a record batch. + * @GARROW_ALREADY_EXISTS: Item already exists error. * * The error codes are used by all arrow-glib functions. * @@ -60,14 +56,10 @@ typedef enum { GARROW_ERROR_UNKNOWN = 9, GARROW_ERROR_NOT_IMPLEMENTED, GARROW_ERROR_SERIALIZATION, - GARROW_ERROR_PYTHON, - GARROW_ERROR_PLASMA_OBJECT_EXISTS = 20, - GARROW_ERROR_PLASMA_OBJECT_NONEXISTENT, - GARROW_ERROR_PLASMA_STORE_FULL, - GARROW_ERROR_PLASMA_OBJECT_ALREADY_SEALED, GARROW_ERROR_CODE_GENERATION = 40, GARROW_ERROR_EXPRESSION_VALIDATION = 41, GARROW_ERROR_EXECUTION = 42, + GARROW_ERROR_ALREADY_EXISTS = 45, } GArrowError; #define GARROW_ERROR garrow_error_quark() diff --git a/c_glib/test/plasma/test-plasma-created-object.rb b/c_glib/test/plasma/test-plasma-created-object.rb index 9025ff4ac22d9..857322d20e14f 100644 --- a/c_glib/test/plasma/test-plasma-created-object.rb +++ b/c_glib/test/plasma/test-plasma-created-object.rb @@ -45,7 +45,7 @@ def teardown test("#abort") do @object.data.set_data(0, @data) - assert_raise(Arrow::Error::PlasmaObjectExists) do + assert_raise(Arrow::Error::AlreadyExists) do @client.create(@id, @data.bytesize, @options) end @object.abort diff --git a/cpp/src/arrow/compute/kernels/cast.cc b/cpp/src/arrow/compute/kernels/cast.cc index 299ca80402c3e..93feb656dd582 100644 --- a/cpp/src/arrow/compute/kernels/cast.cc +++ b/cpp/src/arrow/compute/kernels/cast.cc @@ -52,7 +52,7 @@ if (ARROW_PREDICT_FALSE(!_s.ok())) { \ std::stringstream ss; \ ss << __FILE__ << ":" << __LINE__ << " code: " << #s << "\n" << _s.message(); \ - ctx->SetStatus(Status(_s.code(), ss.str())); \ + ctx->SetStatus(Status(_s.code(), ss.str(), s.detail())); \ return; \ } \ } while (0) diff --git a/cpp/src/arrow/csv/column-builder.cc b/cpp/src/arrow/csv/column-builder.cc index 657aa6f4e967e..4099507016d3b 100644 --- a/cpp/src/arrow/csv/column-builder.cc +++ b/cpp/src/arrow/csv/column-builder.cc @@ -76,7 +76,7 @@ class TypedColumnBuilder : public ColumnBuilder { } else { std::stringstream ss; ss << "In column #" << col_index_ << ": " << st.message(); - return Status(st.code(), ss.str()); + return Status(st.code(), ss.str(), st.detail()); } } diff --git a/cpp/src/arrow/python/common.cc b/cpp/src/arrow/python/common.cc index aa44ec07e6566..3cebc03cd22d6 100644 --- a/cpp/src/arrow/python/common.cc +++ b/cpp/src/arrow/python/common.cc @@ -23,11 +23,15 @@ #include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/logging.h" #include "arrow/python/helpers.h" namespace arrow { + +using internal::checked_cast; + namespace py { static std::mutex memory_pool_mutex; @@ -47,6 +51,129 @@ MemoryPool* get_memory_pool() { } } +// ---------------------------------------------------------------------- +// PythonErrorDetail + +namespace { + +const char kErrorDetailTypeId[] = "arrow::py::PythonErrorDetail"; + +// Try to match the Python exception type with an appropriate Status code +StatusCode MapPyError(PyObject* exc_type) { + StatusCode code; + + if (PyErr_GivenExceptionMatches(exc_type, PyExc_MemoryError)) { + code = StatusCode::OutOfMemory; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_IndexError)) { + code = StatusCode::IndexError; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_KeyError)) { + code = StatusCode::KeyError; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_TypeError)) { + code = StatusCode::TypeError; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_ValueError) || + PyErr_GivenExceptionMatches(exc_type, PyExc_OverflowError)) { + code = StatusCode::Invalid; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_EnvironmentError)) { + code = StatusCode::IOError; + } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_NotImplementedError)) { + code = StatusCode::NotImplemented; + } else { + code = StatusCode::UnknownError; + } + return code; +} + +// PythonErrorDetail indicates a Python exception was raised. +class PythonErrorDetail : public StatusDetail { + public: + const char* type_id() const override { return kErrorDetailTypeId; } + + std::string ToString() const override { + // This is simple enough not to need the GIL + const auto ty = reinterpret_cast(exc_type_.obj()); + // XXX Should we also print traceback? + return std::string("Python exception: ") + ty->tp_name; + } + + void RestorePyError() const { + Py_INCREF(exc_type_.obj()); + Py_INCREF(exc_value_.obj()); + Py_INCREF(exc_traceback_.obj()); + PyErr_Restore(exc_type_.obj(), exc_value_.obj(), exc_traceback_.obj()); + } + + PyObject* exc_type() const { return exc_type_.obj(); } + + PyObject* exc_value() const { return exc_value_.obj(); } + + static std::shared_ptr FromPyError() { + PyObject* exc_type = nullptr; + PyObject* exc_value = nullptr; + PyObject* exc_traceback = nullptr; + + PyErr_Fetch(&exc_type, &exc_value, &exc_traceback); + PyErr_NormalizeException(&exc_type, &exc_value, &exc_traceback); + ARROW_CHECK(exc_type) + << "PythonErrorDetail::FromPyError called without a Python error set"; + DCHECK(PyType_Check(exc_type)); + DCHECK(exc_value); // Ensured by PyErr_NormalizeException, double-check + if (exc_traceback == nullptr) { + // Needed by PyErr_Restore() + Py_INCREF(Py_None); + exc_traceback = Py_None; + } + + std::shared_ptr detail(new PythonErrorDetail); + detail->exc_type_.reset(exc_type); + detail->exc_value_.reset(exc_value); + detail->exc_traceback_.reset(exc_traceback); + return detail; + } + + protected: + PythonErrorDetail() = default; + + OwnedRefNoGIL exc_type_, exc_value_, exc_traceback_; +}; + +} // namespace + +// ---------------------------------------------------------------------- +// Python exception <-> Status + +Status ConvertPyError(StatusCode code) { + auto detail = PythonErrorDetail::FromPyError(); + if (code == StatusCode::UnknownError) { + code = MapPyError(detail->exc_type()); + } + + std::string message; + RETURN_NOT_OK(internal::PyObject_StdStringStr(detail->exc_value(), &message)); + return Status(code, message, detail); +} + +Status PassPyError() { + if (PyErr_Occurred()) { + return ConvertPyError(); + } + return Status::OK(); +} + +bool IsPyError(const Status& status) { + if (status.ok()) { + return false; + } + auto detail = status.detail(); + bool result = detail != nullptr && detail->type_id() == kErrorDetailTypeId; + return result; +} + +void RestorePyError(const Status& status) { + ARROW_CHECK(IsPyError(status)); + const auto& detail = checked_cast(*status.detail()); + detail.RestorePyError(); +} + // ---------------------------------------------------------------------- // PyBuffer @@ -64,7 +191,7 @@ Status PyBuffer::Init(PyObject* obj) { } return Status::OK(); } else { - return Status(StatusCode::PythonError, ""); + return ConvertPyError(StatusCode::Invalid); } } @@ -83,56 +210,5 @@ PyBuffer::~PyBuffer() { } } -// ---------------------------------------------------------------------- -// Python exception -> Status - -Status ConvertPyError(StatusCode code) { - PyObject* exc_type = nullptr; - PyObject* exc_value = nullptr; - PyObject* traceback = nullptr; - - PyErr_Fetch(&exc_type, &exc_value, &traceback); - PyErr_NormalizeException(&exc_type, &exc_value, &traceback); - - DCHECK_NE(exc_type, nullptr) << "ConvertPyError called without an exception set"; - - OwnedRef exc_type_ref(exc_type); - OwnedRef exc_value_ref(exc_value); - OwnedRef traceback_ref(traceback); - - std::string message; - RETURN_NOT_OK(internal::PyObject_StdStringStr(exc_value, &message)); - - if (code == StatusCode::UnknownError) { - // Try to match the Python exception type with an appropriate Status code - if (PyErr_GivenExceptionMatches(exc_type, PyExc_MemoryError)) { - code = StatusCode::OutOfMemory; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_IndexError)) { - code = StatusCode::IndexError; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_KeyError)) { - code = StatusCode::KeyError; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_TypeError)) { - code = StatusCode::TypeError; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_ValueError) || - PyErr_GivenExceptionMatches(exc_type, PyExc_OverflowError)) { - code = StatusCode::Invalid; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_EnvironmentError)) { - code = StatusCode::IOError; - } else if (PyErr_GivenExceptionMatches(exc_type, PyExc_NotImplementedError)) { - code = StatusCode::NotImplemented; - } - } - return Status(code, message); -} - -Status PassPyError() { - if (PyErr_Occurred()) { - // Do not call PyErr_Clear, the assumption is that someone further - // up the call stack will want to deal with the Python error. - return Status(StatusCode::PythonError, ""); - } - return Status::OK(); -} - } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h index 766b76418ded7..9d3dc0c05ef34 100644 --- a/cpp/src/arrow/python/common.h +++ b/cpp/src/arrow/python/common.h @@ -36,7 +36,15 @@ class Result; namespace py { +// Convert current Python error to a Status. The Python error state is cleared +// and can be restored with RestorePyError(). ARROW_PYTHON_EXPORT Status ConvertPyError(StatusCode code = StatusCode::UnknownError); +// Same as ConvertPyError(), but returns Status::OK() if no Python error is set. +ARROW_PYTHON_EXPORT Status PassPyError(); +// Query whether the given Status is a Python error (as wrapped by ConvertPyError()). +ARROW_PYTHON_EXPORT bool IsPyError(const Status& status); +// Restore a Python error wrapped in a Status. +ARROW_PYTHON_EXPORT void RestorePyError(const Status& status); // Catch a pending Python exception and return the corresponding Status. // If no exception is pending, Status::OK() is returned. @@ -48,9 +56,6 @@ inline Status CheckPyError(StatusCode code = StatusCode::UnknownError) { } } -ARROW_PYTHON_EXPORT Status PassPyError(); - -// TODO(wesm): We can just let errors pass through. To be explored later #define RETURN_IF_PYERROR() ARROW_RETURN_NOT_OK(CheckPyError()); #define PY_RETURN_IF_ERROR(CODE) ARROW_RETURN_NOT_OK(CheckPyError(CODE)); @@ -97,6 +102,18 @@ class ARROW_PYTHON_EXPORT PyAcquireGIL { ARROW_DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL); }; +// A RAII-style helper that releases the GIL until the end of a lexical block +class ARROW_PYTHON_EXPORT PyReleaseGIL { + public: + PyReleaseGIL() { saved_state_ = PyEval_SaveThread(); } + + ~PyReleaseGIL() { PyEval_RestoreThread(saved_state_); } + + private: + PyThreadState* saved_state_; + ARROW_DISALLOW_COPY_AND_ASSIGN(PyReleaseGIL); +}; + // A helper to call safely into the Python interpreter from arbitrary C++ code. // The GIL is acquired, and the current thread's error status is preserved. template @@ -109,7 +126,7 @@ Status SafeCallIntoPython(Function&& func) { Status st = std::forward(func)(); // If the return Status is a "Python error", the current Python error status // describes the error and shouldn't be clobbered. - if (!st.IsPythonError() && exc_type != NULLPTR) { + if (!IsPyError(st) && exc_type != NULLPTR) { PyErr_Restore(exc_type, exc_value, exc_traceback); } return st; diff --git a/cpp/src/arrow/python/python-test.cc b/cpp/src/arrow/python/python-test.cc index 5de613f0e50ba..5027d3fe3f631 100644 --- a/cpp/src/arrow/python/python-test.cc +++ b/cpp/src/arrow/python/python-test.cc @@ -40,21 +40,12 @@ using internal::checked_cast; namespace py { -TEST(PyBuffer, InvalidInputObject) { - std::shared_ptr res; - PyObject* input = Py_None; - auto old_refcnt = Py_REFCNT(input); - ASSERT_RAISES(PythonError, PyBuffer::FromPyObject(input, &res)); - PyErr_Clear(); - ASSERT_EQ(old_refcnt, Py_REFCNT(input)); -} - TEST(OwnedRef, TestMoves) { - PyAcquireGIL lock; std::vector vec; PyObject *u, *v; u = PyList_New(0); v = PyList_New(0); + { OwnedRef ref(u); vec.push_back(std::move(ref)); @@ -66,31 +57,42 @@ TEST(OwnedRef, TestMoves) { } TEST(OwnedRefNoGIL, TestMoves) { - std::vector vec; - PyObject *u, *v; - { - PyAcquireGIL lock; - u = PyList_New(0); - v = PyList_New(0); - } + PyAcquireGIL lock; + lock.release(); + { - OwnedRefNoGIL ref(u); - vec.push_back(std::move(ref)); - ASSERT_EQ(ref.obj(), nullptr); + std::vector vec; + PyObject *u, *v; + { + lock.acquire(); + u = PyList_New(0); + v = PyList_New(0); + lock.release(); + } + { + OwnedRefNoGIL ref(u); + vec.push_back(std::move(ref)); + ASSERT_EQ(ref.obj(), nullptr); + } + vec.emplace_back(v); + ASSERT_EQ(Py_REFCNT(u), 1); + ASSERT_EQ(Py_REFCNT(v), 1); } - vec.emplace_back(v); - ASSERT_EQ(Py_REFCNT(u), 1); - ASSERT_EQ(Py_REFCNT(v), 1); } TEST(CheckPyError, TestStatus) { - PyAcquireGIL lock; Status st; - auto check_error = [](Status& st, const char* expected_message = "some error") { + auto check_error = [](Status& st, const char* expected_message = "some error", + const char* expected_detail = nullptr) { st = CheckPyError(); ASSERT_EQ(st.message(), expected_message); ASSERT_FALSE(PyErr_Occurred()); + if (expected_detail) { + auto detail = st.detail(); + ASSERT_NE(detail, nullptr); + ASSERT_EQ(detail->ToString(), expected_detail); + } }; for (PyObject* exc_type : {PyExc_Exception, PyExc_SyntaxError}) { @@ -100,7 +102,7 @@ TEST(CheckPyError, TestStatus) { } PyErr_SetString(PyExc_TypeError, "some error"); - check_error(st); + check_error(st, "some error", "Python exception: TypeError"); ASSERT_TRUE(st.IsTypeError()); PyErr_SetString(PyExc_ValueError, "some error"); @@ -118,7 +120,7 @@ TEST(CheckPyError, TestStatus) { } PyErr_SetString(PyExc_NotImplementedError, "some error"); - check_error(st); + check_error(st, "some error", "Python exception: NotImplementedError"); ASSERT_TRUE(st.IsNotImplemented()); // No override if a specific status code is given @@ -129,6 +131,52 @@ TEST(CheckPyError, TestStatus) { ASSERT_FALSE(PyErr_Occurred()); } +TEST(CheckPyError, TestStatusNoGIL) { + PyAcquireGIL lock; + { + Status st; + PyErr_SetString(PyExc_ZeroDivisionError, "zzzt"); + st = ConvertPyError(); + ASSERT_FALSE(PyErr_Occurred()); + lock.release(); + ASSERT_TRUE(st.IsUnknownError()); + ASSERT_EQ(st.message(), "zzzt"); + ASSERT_EQ(st.detail()->ToString(), "Python exception: ZeroDivisionError"); + } +} + +TEST(RestorePyError, Basics) { + PyErr_SetString(PyExc_ZeroDivisionError, "zzzt"); + auto st = ConvertPyError(); + ASSERT_FALSE(PyErr_Occurred()); + ASSERT_TRUE(st.IsUnknownError()); + ASSERT_EQ(st.message(), "zzzt"); + ASSERT_EQ(st.detail()->ToString(), "Python exception: ZeroDivisionError"); + + RestorePyError(st); + ASSERT_TRUE(PyErr_Occurred()); + PyObject* exc_type; + PyObject* exc_value; + PyObject* exc_traceback; + PyErr_Fetch(&exc_type, &exc_value, &exc_traceback); + ASSERT_TRUE(PyErr_GivenExceptionMatches(exc_type, PyExc_ZeroDivisionError)); + std::string py_message; + ASSERT_OK(internal::PyObject_StdStringStr(exc_value, &py_message)); + ASSERT_EQ(py_message, "zzzt"); +} + +TEST(PyBuffer, InvalidInputObject) { + std::shared_ptr res; + PyObject* input = Py_None; + auto old_refcnt = Py_REFCNT(input); + { + Status st = PyBuffer::FromPyObject(input, &res); + ASSERT_TRUE(IsPyError(st)) << st.ToString(); + ASSERT_FALSE(PyErr_Occurred()); + } + ASSERT_EQ(old_refcnt, Py_REFCNT(input)); +} + class DecimalTest : public ::testing::Test { public: DecimalTest() : lock_(), decimal_constructor_() { @@ -253,8 +301,6 @@ TEST(PandasConversionTest, TestObjectBlockWriteFails) { } TEST(BuiltinConversionTest, TestMixedTypeFails) { - PyAcquireGIL lock; - OwnedRef list_ref(PyList_New(3)); PyObject* list = list_ref.obj(); @@ -405,8 +451,6 @@ TEST_F(DecimalTest, TestMixedPrecisionAndScale) { } TEST_F(DecimalTest, TestMixedPrecisionAndScaleSequenceConvert) { - PyAcquireGIL lock; - PyObject* value1 = this->CreatePythonDecimal("0.01").detach(); ASSERT_NE(value1, nullptr); diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index d93e3954e41dc..5784394377526 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -332,8 +332,8 @@ Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict, Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem, PyObject** result) { - *result = NULL; if (context == Py_None) { + *result = NULL; return Status::SerializationError("error while calling callback on ", internal::PyObject_StdStringRepr(elem), ": handler not registered"); diff --git a/cpp/src/arrow/status-test.cc b/cpp/src/arrow/status-test.cc index b7fc61f480155..b151e462b2803 100644 --- a/cpp/src/arrow/status-test.cc +++ b/cpp/src/arrow/status-test.cc @@ -23,6 +23,16 @@ namespace arrow { +namespace { + +class TestStatusDetail : public StatusDetail { + public: + const char* type_id() const override { return "type_id"; } + std::string ToString() const override { return "a specific detail message"; } +}; + +} // namespace + TEST(StatusTest, TestCodeAndMessage) { Status ok = Status::OK(); ASSERT_EQ(StatusCode::OK, ok.code()); @@ -40,6 +50,25 @@ TEST(StatusTest, TestToString) { ASSERT_EQ(file_error.ToString(), ss.str()); } +TEST(StatusTest, TestToStringWithDetail) { + Status status(StatusCode::IOError, "summary", std::make_shared()); + ASSERT_EQ("IOError: summary. Detail: a specific detail message", status.ToString()); + + std::stringstream ss; + ss << status; + ASSERT_EQ(status.ToString(), ss.str()); +} + +TEST(StatusTest, TestWithDetail) { + Status status(StatusCode::IOError, "summary"); + auto detail = std::make_shared(); + Status new_status = status.WithDetail(detail); + + ASSERT_EQ(new_status.code(), status.code()); + ASSERT_EQ(new_status.message(), status.message()); + ASSERT_EQ(new_status.detail(), detail); +} + TEST(StatusTest, AndStatus) { Status a = Status::OK(); Status b = Status::OK(); diff --git a/cpp/src/arrow/status.cc b/cpp/src/arrow/status.cc index cbb29119be63a..785db45975227 100644 --- a/cpp/src/arrow/status.cc +++ b/cpp/src/arrow/status.cc @@ -21,11 +21,17 @@ namespace arrow { -Status::Status(StatusCode code, const std::string& msg) { +Status::Status(StatusCode code, const std::string& msg) + : Status::Status(code, msg, nullptr) {} + +Status::Status(StatusCode code, std::string msg, std::shared_ptr detail) { ARROW_CHECK_NE(code, StatusCode::OK) << "Cannot construct ok status with message"; state_ = new State; state_->code = code; - state_->msg = msg; + state_->msg = std::move(msg); + if (detail != nullptr) { + state_->detail = std::move(detail); + } } void Status::CopyFrom(const Status& s) { @@ -77,21 +83,6 @@ std::string Status::CodeAsString() const { case StatusCode::SerializationError: type = "Serialization error"; break; - case StatusCode::PythonError: - type = "Python error"; - break; - case StatusCode::PlasmaObjectExists: - type = "Plasma object exists"; - break; - case StatusCode::PlasmaObjectNonexistent: - type = "Plasma object is nonexistent"; - break; - case StatusCode::PlasmaStoreFull: - type = "Plasma store is full"; - break; - case StatusCode::PlasmaObjectAlreadySealed: - type = "Plasma object is already sealed"; - break; case StatusCode::CodeGenError: type = "CodeGenError in Gandiva"; break; @@ -110,11 +101,16 @@ std::string Status::CodeAsString() const { std::string Status::ToString() const { std::string result(CodeAsString()); - if (state_ == NULL) { + if (state_ == nullptr) { return result; } result += ": "; result += state_->msg; + if (state_->detail != nullptr) { + result += ". Detail: "; + result += state_->detail->ToString(); + } + return result; } diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h index 1ed0da65fc4bc..7cafc41902df2 100644 --- a/cpp/src/arrow/status.h +++ b/cpp/src/arrow/status.h @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -85,17 +86,13 @@ enum class StatusCode : char { UnknownError = 9, NotImplemented = 10, SerializationError = 11, - PythonError = 12, RError = 13, - PlasmaObjectExists = 20, - PlasmaObjectNonexistent = 21, - PlasmaStoreFull = 22, - PlasmaObjectAlreadySealed = 23, - StillExecuting = 24, // Gandiva range of errors CodeGenError = 40, ExpressionValidationError = 41, - ExecutionError = 42 + ExecutionError = 42, + // Continue generic codes. + AlreadyExists = 45 }; #if defined(__clang__) @@ -103,6 +100,17 @@ enum class StatusCode : char { class ARROW_MUST_USE_RESULT ARROW_EXPORT Status; #endif +/// \brief An opaque class that allows subsystems to retain +/// additional information inside the Status. +class ARROW_EXPORT StatusDetail { + public: + virtual ~StatusDetail() = default; + // Return a unique id for the type of the StatusDetail + // (effectively a poor man's substitude for RTTI). + virtual const char* type_id() const = 0; + virtual std::string ToString() const = 0; +}; + /// \brief Status outcome object (success or error) /// /// The Status object is an object holding the outcome of an operation. @@ -124,6 +132,8 @@ class ARROW_EXPORT Status { } Status(StatusCode code, const std::string& msg); + /// \brief Pluggable constructor for use by sub-systems. detail cannot be null. + Status(StatusCode code, std::string msg, std::shared_ptr detail); // Copy the specified status. inline Status(const Status& s); @@ -221,32 +231,6 @@ class ARROW_EXPORT Status { return Status(StatusCode::RError, util::StringBuilder(std::forward(args)...)); } - template - static Status PlasmaObjectExists(Args&&... args) { - return Status(StatusCode::PlasmaObjectExists, - util::StringBuilder(std::forward(args)...)); - } - - template - static Status PlasmaObjectNonexistent(Args&&... args) { - return Status(StatusCode::PlasmaObjectNonexistent, - util::StringBuilder(std::forward(args)...)); - } - - template - static Status PlasmaObjectAlreadySealed(Args&&... args) { - return Status(StatusCode::PlasmaObjectAlreadySealed, - util::StringBuilder(std::forward(args)...)); - } - - template - static Status PlasmaStoreFull(Args&&... args) { - return Status(StatusCode::PlasmaStoreFull, - util::StringBuilder(std::forward(args)...)); - } - - static Status StillExecuting() { return Status(StatusCode::StillExecuting, ""); } - template static Status CodeGenError(Args&&... args) { return Status(StatusCode::CodeGenError, @@ -290,22 +274,6 @@ class ARROW_EXPORT Status { bool IsSerializationError() const { return code() == StatusCode::SerializationError; } /// Return true iff the status indicates a R-originated error. bool IsRError() const { return code() == StatusCode::RError; } - /// Return true iff the status indicates a Python-originated error. - bool IsPythonError() const { return code() == StatusCode::PythonError; } - /// Return true iff the status indicates an already existing Plasma object. - bool IsPlasmaObjectExists() const { return code() == StatusCode::PlasmaObjectExists; } - /// Return true iff the status indicates a non-existent Plasma object. - bool IsPlasmaObjectNonexistent() const { - return code() == StatusCode::PlasmaObjectNonexistent; - } - /// Return true iff the status indicates an already sealed Plasma object. - bool IsPlasmaObjectAlreadySealed() const { - return code() == StatusCode::PlasmaObjectAlreadySealed; - } - /// Return true iff the status indicates the Plasma store reached its capacity limit. - bool IsPlasmaStoreFull() const { return code() == StatusCode::PlasmaStoreFull; } - - bool IsStillExecuting() const { return code() == StatusCode::StillExecuting; } bool IsCodeGenError() const { return code() == StatusCode::CodeGenError; } @@ -330,6 +298,17 @@ class ARROW_EXPORT Status { /// \brief Return the specific error message attached to this status. std::string message() const { return ok() ? "" : state_->msg; } + /// \brief Return the status detail attached to this message. + std::shared_ptr detail() const { + return state_ == NULLPTR ? NULLPTR : state_->detail; + } + + /// \brief Returns a new Status copying the existing status, but + /// updating with the existing detail. + Status WithDetail(std::shared_ptr new_detail) { + return Status(code(), message(), std::move(new_detail)); + } + [[noreturn]] void Abort() const; [[noreturn]] void Abort(const std::string& message) const; @@ -341,6 +320,7 @@ class ARROW_EXPORT Status { struct State { StatusCode code; std::string msg; + std::shared_ptr detail; }; // OK status has a `NULL` state_. Otherwise, `state_` points to // a `State` structure containing the error code and message(s) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index ce9795d20fc13..a6cdf7f17ca97 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -791,11 +791,12 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { auto object_entry = objects_in_use_.find(object_id); if (object_entry == objects_in_use_.end()) { - return Status::PlasmaObjectNonexistent( - "Seal() called on an object without a reference to it"); + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent, + "Seal() called on an object without a reference to it"); } if (object_entry->second->is_sealed) { - return Status::PlasmaObjectAlreadySealed("Seal() called on an already sealed object"); + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectAlreadySealed, + "Seal() called on an already sealed object"); } object_entry->second->is_sealed = true; @@ -896,7 +897,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { RETURN_NOT_OK(Get({object_id}, 0, &object_buffers)); // If the object was not retrieved, return false. if (!object_buffers[0].data) { - return Status::PlasmaObjectNonexistent("Object not found"); + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent, "Object not found"); } // Compute the hash. uint64_t hash = ComputeObjectHash(object_buffers[0]); diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc index 0f1a0d1b505bf..bbcd2c9c3f13c 100644 --- a/cpp/src/plasma/common.cc +++ b/cpp/src/plasma/common.cc @@ -18,6 +18,7 @@ #include "plasma/common.h" #include +#include #include "arrow/util/ubsan.h" @@ -27,8 +28,88 @@ namespace fb = plasma::flatbuf; namespace plasma { +namespace { + +const char kErrorDetailTypeId[] = "plasma::PlasmaStatusDetail"; + +class PlasmaStatusDetail : public arrow::StatusDetail { + public: + explicit PlasmaStatusDetail(PlasmaErrorCode code) : code_(code) {} + const char* type_id() const override { return kErrorDetailTypeId; } + std::string ToString() const override { + const char* type; + switch (code()) { + case PlasmaErrorCode::PlasmaObjectExists: + type = "Plasma object exists"; + break; + case PlasmaErrorCode::PlasmaObjectNonexistent: + type = "Plasma object is nonexistent"; + break; + case PlasmaErrorCode::PlasmaStoreFull: + type = "Plasma store is full"; + break; + case PlasmaErrorCode::PlasmaObjectAlreadySealed: + type = "Plasma object is already sealed"; + break; + default: + type = "Unknown plasma error"; + break; + } + return std::string(type); + } + PlasmaErrorCode code() const { return code_; } + + private: + PlasmaErrorCode code_; +}; + +bool IsPlasmaStatus(const arrow::Status& status, PlasmaErrorCode code) { + if (status.ok()) { + return false; + } + auto* detail = status.detail().get(); + return detail != nullptr && detail->type_id() == kErrorDetailTypeId && + static_cast(detail)->code() == code; +} + +} // namespace + using arrow::Status; +arrow::Status MakePlasmaError(PlasmaErrorCode code, std::string message) { + arrow::StatusCode arrow_code = arrow::StatusCode::UnknownError; + switch (code) { + case PlasmaErrorCode::PlasmaObjectExists: + arrow_code = arrow::StatusCode::AlreadyExists; + break; + case PlasmaErrorCode::PlasmaObjectNonexistent: + arrow_code = arrow::StatusCode::KeyError; + break; + case PlasmaErrorCode::PlasmaStoreFull: + arrow_code = arrow::StatusCode::CapacityError; + break; + case PlasmaErrorCode::PlasmaObjectAlreadySealed: + // Maybe a stretch? + arrow_code = arrow::StatusCode::TypeError; + break; + } + return arrow::Status(arrow_code, std::move(message), + std::make_shared(code)); +} + +bool IsPlasmaObjectExists(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectExists); +} +bool IsPlasmaObjectNonexistent(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectNonexistent); +} +bool IsPlasmaObjectAlreadySealed(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectAlreadySealed); +} +bool IsPlasmaStoreFull(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull); +} + UniqueID UniqueID::from_binary(const std::string& binary) { UniqueID id; std::memcpy(&id, binary.data(), sizeof(id)); diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h index 6f4cef5becb62..d42840cfbd2e5 100644 --- a/cpp/src/plasma/common.h +++ b/cpp/src/plasma/common.h @@ -41,6 +41,23 @@ namespace plasma { enum class ObjectLocation : int32_t { Local, Remote, Nonexistent }; +enum class PlasmaErrorCode : int8_t { + PlasmaObjectExists = 1, + PlasmaObjectNonexistent = 2, + PlasmaStoreFull = 3, + PlasmaObjectAlreadySealed = 4, +}; + +ARROW_EXPORT arrow::Status MakePlasmaError(PlasmaErrorCode code, std::string message); +/// Return true iff the status indicates an already existing Plasma object. +ARROW_EXPORT bool IsPlasmaObjectExists(const arrow::Status& status); +/// Return true iff the status indicates a non-existent Plasma object. +ARROW_EXPORT bool IsPlasmaObjectNonexistent(const arrow::Status& status); +/// Return true iff the status indicates an already sealed Plasma object. +ARROW_EXPORT bool IsPlasmaObjectAlreadySealed(const arrow::Status& status); +/// Return true iff the status indicates the Plasma store reached its capacity limit. +ARROW_EXPORT bool IsPlasmaStoreFull(const arrow::Status& status); + constexpr int64_t kUniqueIDSize = 20; class ARROW_EXPORT UniqueID { diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index b87656bd097c6..c22d77d60190d 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -86,11 +86,14 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { case fb::PlasmaError::OK: return Status::OK(); case fb::PlasmaError::ObjectExists: - return Status::PlasmaObjectExists("object already exists in the plasma store"); + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectExists, + "object already exists in the plasma store"); case fb::PlasmaError::ObjectNonexistent: - return Status::PlasmaObjectNonexistent("object does not exist in the plasma store"); + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNonexistent, + "object does not exist in the plasma store"); case fb::PlasmaError::OutOfMemory: - return Status::PlasmaStoreFull("object does not fit in the plasma store"); + return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull, + "object does not fit in the plasma store"); default: ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast(plasma_error); } diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index 435b687a69e1a..deffde57976a9 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -157,7 +157,7 @@ TEST_F(TestPlasmaStore, SealErrorsTest) { ObjectID object_id = random_object_id(); Status result = client_.Seal(object_id); - ASSERT_TRUE(result.IsPlasmaObjectNonexistent()); + ASSERT_TRUE(IsPlasmaObjectNonexistent(result)); // Create object. std::vector data(100, 0); @@ -165,7 +165,7 @@ TEST_F(TestPlasmaStore, SealErrorsTest) { // Trying to seal it again. result = client_.Seal(object_id); - ASSERT_TRUE(result.IsPlasmaObjectAlreadySealed()); + ASSERT_TRUE(IsPlasmaObjectAlreadySealed(result)); ARROW_CHECK_OK(client_.Release(object_id)); } diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc index 7e2bc887ed3f6..f3cff4285824c 100644 --- a/cpp/src/plasma/test/serialization_tests.cc +++ b/cpp/src/plasma/test/serialization_tests.cc @@ -156,7 +156,7 @@ TEST_F(TestPlasmaSerialization, SealReply) { ObjectID object_id2; Status s = ReadSealReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); - ASSERT_TRUE(s.IsPlasmaObjectExists()); + ASSERT_TRUE(IsPlasmaObjectExists(s)); close(fd); } @@ -234,7 +234,7 @@ TEST_F(TestPlasmaSerialization, ReleaseReply) { ObjectID object_id2; Status s = ReadReleaseReply(data.data(), data.size(), &object_id2); ASSERT_EQ(object_id1, object_id2); - ASSERT_TRUE(s.IsPlasmaObjectExists()); + ASSERT_TRUE(IsPlasmaObjectExists(s)); close(fd); } diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index bbbd91a950895..1d508ed7d11f8 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -122,8 +122,7 @@ def parse_git(root, **kwargs): ArrowMemoryError, ArrowNotImplementedError, ArrowTypeError, - ArrowSerializationError, - PlasmaObjectExists) + ArrowSerializationError) # Serialization from pyarrow.lib import (deserialize_from, deserialize, diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index e352377f14e76..7e994c3ee079d 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -37,8 +37,10 @@ import warnings import pyarrow from pyarrow.lib cimport Buffer, NativeFile, check_status, pyarrow_wrap_buffer +from pyarrow.lib import ArrowException from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, CFixedSizeBufferWriter, CStatus) +from pyarrow.includes.libplasma cimport * from pyarrow import compat @@ -255,6 +257,34 @@ cdef class PlasmaBuffer(Buffer): self.client._release(self.object_id) +class PlasmaObjectNonexistent(ArrowException): + pass + + +class PlasmaStoreFull(ArrowException): + pass + + +class PlasmaObjectExists(ArrowException): + pass + + +cdef int plasma_check_status(const CStatus& status) nogil except -1: + if status.ok(): + return 0 + + with gil: + message = compat.frombytes(status.message()) + if IsPlasmaObjectExists(status): + raise PlasmaObjectExists(message) + elif IsPlasmaObjectNonexistent(status): + raise PlasmaObjectNonexistent(message) + elif IsPlasmaStoreFull(status): + raise PlasmaStoreFull(message) + + return check_status(status) + + cdef class PlasmaClient: """ The PlasmaClient is used to interface with a plasma store and manager. @@ -283,7 +313,7 @@ cdef class PlasmaClient: for object_id in object_ids: ids.push_back(object_id.data) with nogil: - check_status(self.client.get().Get(ids, timeout_ms, result)) + plasma_check_status(self.client.get().Get(ids, timeout_ms, result)) # XXX C++ API should instead expose some kind of CreateAuto() cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data, @@ -325,9 +355,10 @@ cdef class PlasmaClient: """ cdef shared_ptr[CBuffer] data with nogil: - check_status(self.client.get().Create(object_id.data, data_size, - (metadata.data()), - metadata.size(), &data)) + plasma_check_status( + self.client.get().Create(object_id.data, data_size, + (metadata.data()), + metadata.size(), &data)) return self._make_mutable_plasma_buffer(object_id, data.get().mutable_data(), data_size) @@ -358,8 +389,9 @@ cdef class PlasmaClient: enough objects to create room for it. """ with nogil: - check_status(self.client.get().CreateAndSeal(object_id.data, data, - metadata)) + plasma_check_status( + self.client.get().CreateAndSeal(object_id.data, data, + metadata)) def get_buffers(self, object_ids, timeout_ms=-1, with_meta=False): """ @@ -554,7 +586,7 @@ cdef class PlasmaClient: A string used to identify an object. """ with nogil: - check_status(self.client.get().Seal(object_id.data)) + plasma_check_status(self.client.get().Seal(object_id.data)) def _release(self, ObjectID object_id): """ @@ -566,7 +598,7 @@ cdef class PlasmaClient: A string used to identify an object. """ with nogil: - check_status(self.client.get().Release(object_id.data)) + plasma_check_status(self.client.get().Release(object_id.data)) def contains(self, ObjectID object_id): """ @@ -579,8 +611,8 @@ cdef class PlasmaClient: """ cdef c_bool is_contained with nogil: - check_status(self.client.get().Contains(object_id.data, - &is_contained)) + plasma_check_status(self.client.get().Contains(object_id.data, + &is_contained)) return is_contained def hash(self, ObjectID object_id): @@ -600,8 +632,8 @@ cdef class PlasmaClient: """ cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize) with nogil: - check_status(self.client.get().Hash(object_id.data, - digest.data())) + plasma_check_status(self.client.get().Hash(object_id.data, + digest.data())) return bytes(digest[:]) def evict(self, int64_t num_bytes): @@ -617,13 +649,15 @@ cdef class PlasmaClient: """ cdef int64_t num_bytes_evicted = -1 with nogil: - check_status(self.client.get().Evict(num_bytes, num_bytes_evicted)) + plasma_check_status( + self.client.get().Evict(num_bytes, num_bytes_evicted)) return num_bytes_evicted def subscribe(self): """Subscribe to notifications about sealed objects.""" with nogil: - check_status(self.client.get().Subscribe(&self.notification_fd)) + plasma_check_status( + self.client.get().Subscribe(&self.notification_fd)) def get_notification_socket(self): """ @@ -650,11 +684,11 @@ cdef class PlasmaClient: cdef int64_t data_size cdef int64_t metadata_size with nogil: - check_status(self.client.get() - .DecodeNotification(buf, - &object_id, - &data_size, - &metadata_size)) + status = self.client.get().DecodeNotification(buf, + &object_id, + &data_size, + &metadata_size) + plasma_check_status(status) return ObjectID(object_id.binary()), data_size, metadata_size def get_next_notification(self): @@ -674,11 +708,11 @@ cdef class PlasmaClient: cdef int64_t data_size cdef int64_t metadata_size with nogil: - check_status(self.client.get() - .GetNotification(self.notification_fd, - &object_id.data, - &data_size, - &metadata_size)) + status = self.client.get().GetNotification(self.notification_fd, + &object_id.data, + &data_size, + &metadata_size) + plasma_check_status(status) return object_id, data_size, metadata_size def to_capsule(self): @@ -689,7 +723,7 @@ cdef class PlasmaClient: Disconnect this client from the Plasma store. """ with nogil: - check_status(self.client.get().Disconnect()) + plasma_check_status(self.client.get().Disconnect()) def delete(self, object_ids): """ @@ -705,7 +739,7 @@ cdef class PlasmaClient: for object_id in object_ids: ids.push_back(object_id.data) with nogil: - check_status(self.client.get().Delete(ids)) + plasma_check_status(self.client.get().Delete(ids)) def list(self): """ @@ -738,7 +772,7 @@ cdef class PlasmaClient: """ cdef CObjectTable objects with nogil: - check_status(self.client.get().List(&objects)) + plasma_check_status(self.client.get().List(&objects)) result = dict() cdef ObjectID object_id cdef CObjectTableEntry entry @@ -802,7 +836,7 @@ def connect(store_socket_name, manager_socket_name=None, int release_delay=0, warnings.warn("release_delay in PlasmaClient.connect is deprecated", FutureWarning) with nogil: - check_status(result.client.get() - .Connect(result.store_socket_name, b"", - release_delay, num_retries)) + plasma_check_status( + result.client.get().Connect(result.store_socket_name, b"", + release_delay, num_retries)) return result diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi index 7b5e8d4337177..3cb9142d4794e 100644 --- a/python/pyarrow/error.pxi +++ b/python/pyarrow/error.pxi @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from pyarrow.includes.libarrow cimport CStatus +from pyarrow.includes.libarrow cimport CStatus, IsPyError, RestorePyError from pyarrow.includes.common cimport c_string from pyarrow.compat import frombytes @@ -56,30 +56,21 @@ class ArrowIndexError(IndexError, ArrowException): pass -class PlasmaObjectExists(ArrowException): - pass - - -class PlasmaObjectNonexistent(ArrowException): - pass - - -class PlasmaStoreFull(ArrowException): - pass - - class ArrowSerializationError(ArrowException): pass +# This function could be written directly in C++ if we didn't +# define Arrow-specific subclasses (ArrowInvalid etc.) cdef int check_status(const CStatus& status) nogil except -1: if status.ok(): return 0 - if status.IsPythonError(): - return -1 - with gil: + if IsPyError(status): + RestorePyError(status) + return -1 + message = frombytes(status.message()) if status.IsInvalid(): raise ArrowInvalid(message) @@ -97,12 +88,6 @@ cdef int check_status(const CStatus& status) nogil except -1: raise ArrowCapacityError(message) elif status.IsIndexError(): raise ArrowIndexError(message) - elif status.IsPlasmaObjectExists(): - raise PlasmaObjectExists(message) - elif status.IsPlasmaObjectNonexistent(): - raise PlasmaObjectNonexistent(message) - elif status.IsPlasmaStoreFull(): - raise PlasmaStoreFull(message) elif status.IsSerializationError(): raise ArrowSerializationError(message) else: diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd index 4a06fc8206572..8b116f60b6e3e 100644 --- a/python/pyarrow/includes/common.pxd +++ b/python/pyarrow/includes/common.pxd @@ -42,6 +42,7 @@ cdef extern from "numpy/halffloat.h": cdef extern from "arrow/api.h" namespace "arrow" nogil: # We can later add more of the common status factory methods as needed cdef CStatus CStatus_OK "arrow::Status::OK"() + cdef CStatus CStatus_Invalid "arrow::Status::Invalid"() cdef CStatus CStatus_NotImplemented \ "arrow::Status::NotImplemented"(const c_string& msg) @@ -64,10 +65,6 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: c_bool IsCapacityError() c_bool IsIndexError() c_bool IsSerializationError() - c_bool IsPythonError() - c_bool IsPlasmaObjectExists() - c_bool IsPlasmaObjectNonexistent() - c_bool IsPlasmaStoreFull() cdef extern from "arrow/result.h" namespace "arrow::internal" nogil: cdef cppclass CResult[T]: diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 93a75945ce380..89199ca77fb6b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1217,6 +1217,8 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: CMemoryPool* pool c_bool from_pandas + # TODO Some functions below are not actually "nogil" + CStatus ConvertPySequence(object obj, object mask, const PyConversionOptions& options, shared_ptr[CChunkedArray]* out) @@ -1342,6 +1344,11 @@ cdef extern from 'arrow/python/init.h': int arrow_init_numpy() except -1 +cdef extern from 'arrow/python/common.h' namespace "arrow::py": + c_bool IsPyError(const CStatus& status) + void RestorePyError(const CStatus& status) + + cdef extern from 'arrow/python/pyarrow.h' namespace 'arrow::py': int import_pyarrow() except -1 diff --git a/python/pyarrow/includes/libplasma.pxd b/python/pyarrow/includes/libplasma.pxd new file mode 100644 index 0000000000000..1b84ab4e0a695 --- /dev/null +++ b/python/pyarrow/includes/libplasma.pxd @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * + +cdef extern from "plasma/common.h" namespace "plasma" nogil: + cdef c_bool IsPlasmaObjectExists(const CStatus& status) + cdef c_bool IsPlasmaObjectNonexistent(const CStatus& status) + cdef c_bool IsPlasmaStoreFull(const CStatus& status) diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py index 748de97c36395..43ca471e0b21c 100644 --- a/python/pyarrow/plasma.py +++ b/python/pyarrow/plasma.py @@ -27,7 +27,9 @@ import time from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa - PlasmaBuffer, PlasmaClient, connect) + PlasmaBuffer, PlasmaClient, connect, + PlasmaObjectExists, PlasmaObjectNonexistent, + PlasmaStoreFull) # The Plasma TensorFlow Operator needs to be compiled on the end user's diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 9d66d96e2c23e..f961c00b7acd5 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -1479,7 +1479,7 @@ def test_array_masked(): def test_array_from_large_pyints(): # ARROW-5430 - with pytest.raises(pa.ArrowInvalid): + with pytest.raises(OverflowError): # too large for int64 so dtype must be explicitly provided pa.array([int(2 ** 63)]) diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index 4e040836979aa..81d5952b4b1d4 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -26,9 +26,12 @@ import datetime import decimal import itertools +import traceback +import sys + import numpy as np -import six import pytz +import six int_type_pairs = [ @@ -53,6 +56,19 @@ def __iter__(self): return self.lst.__iter__() +class MyInt: + def __init__(self, value): + self.value = value + + def __int__(self): + return self.value + + +class MyBrokenInt: + def __int__(self): + 1/0 # MARKER + + def check_struct_type(ty, expected): """ Check a struct type is as expected, but not taking order into account. @@ -191,7 +207,7 @@ def test_nested_lists(seq): @parametrize_with_iterable_types def test_list_with_non_list(seq): # List types don't accept non-sequences - with pytest.raises(pa.ArrowTypeError): + with pytest.raises(TypeError): pa.array(seq([[], [1, 2], 3]), type=pa.list_(pa.int64())) @@ -299,6 +315,24 @@ def test_sequence_numpy_integer_inferred(seq, np_scalar_pa_type): assert arr.to_pylist() == expected +@parametrize_with_iterable_types +def test_sequence_custom_integers(seq): + expected = [0, 42, 2**33 + 1, -2**63] + data = list(map(MyInt, expected)) + arr = pa.array(seq(data), type=pa.int64()) + assert arr.to_pylist() == expected + + +@parametrize_with_iterable_types +def test_broken_integers(seq): + data = [MyBrokenInt()] + with pytest.raises(ZeroDivisionError) as exc_info: + pa.array(seq(data), type=pa.int64()) + # Original traceback is kept + tb_lines = traceback.format_tb(exc_info.tb) + assert "# MARKER" in tb_lines[-1] + + def test_numpy_scalars_mixed_type(): # ARROW-4324 data = [np.int32(10), np.float32(0.5)] @@ -308,7 +342,7 @@ def test_numpy_scalars_mixed_type(): @pytest.mark.xfail(reason="Type inference for uint64 not implemented", - raises=pa.ArrowException) + raises=OverflowError) def test_uint64_max_convert(): data = [0, np.iinfo(np.uint64).max] @@ -323,20 +357,20 @@ def test_uint64_max_convert(): @pytest.mark.parametrize("bits", [8, 16, 32, 64]) def test_signed_integer_overflow(bits): ty = getattr(pa, "int%d" % bits)() - # XXX ideally would raise OverflowError - with pytest.raises((ValueError, pa.ArrowException)): + # XXX ideally would always raise OverflowError + with pytest.raises((OverflowError, pa.ArrowInvalid)): pa.array([2 ** (bits - 1)], ty) - with pytest.raises((ValueError, pa.ArrowException)): + with pytest.raises((OverflowError, pa.ArrowInvalid)): pa.array([-2 ** (bits - 1) - 1], ty) @pytest.mark.parametrize("bits", [8, 16, 32, 64]) def test_unsigned_integer_overflow(bits): ty = getattr(pa, "uint%d" % bits)() - # XXX ideally would raise OverflowError - with pytest.raises((ValueError, pa.ArrowException)): + # XXX ideally would always raise OverflowError + with pytest.raises((OverflowError, pa.ArrowInvalid)): pa.array([2 ** bits], ty) - with pytest.raises((ValueError, pa.ArrowException)): + with pytest.raises((OverflowError, pa.ArrowInvalid)): pa.array([-1], ty) @@ -661,7 +695,7 @@ def test_sequence_explicit_types(input): def test_date32_overflow(): # Overflow data3 = [2**32, None] - with pytest.raises(pa.ArrowException): + with pytest.raises((OverflowError, pa.ArrowException)): pa.array(data3, type=pa.date32()) @@ -831,12 +865,19 @@ def test_sequence_timestamp_from_int_with_unit(): assert repr(arr_ns[0]) == "Timestamp('1970-01-01 00:00:00.000000001')" assert str(arr_ns[0]) == "1970-01-01 00:00:00.000000001" - with pytest.raises(pa.ArrowException): - class CustomClass(): - pass - pa.array([1, CustomClass()], type=ns) - pa.array([1, CustomClass()], type=pa.date32()) - pa.array([1, CustomClass()], type=pa.date64()) + if sys.version_info >= (3,): + expected_exc = TypeError + else: + # Can have "AttributeError: CustomClass instance + # has no attribute '__trunc__'" + expected_exc = (TypeError, AttributeError) + + class CustomClass(): + pass + + for ty in [ns, pa.date32(), pa.date64()]: + with pytest.raises(expected_exc): + pa.array([1, CustomClass()], type=ty) def test_sequence_nesting_levels(): diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index 149bdd54f6cb8..49808a19ef47a 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -227,7 +227,7 @@ def test_create_and_seal(self): # Make sure that creating the same object twice raises an exception. object_id = random_object_id() self.plasma_client.create_and_seal(object_id, b'a', b'b') - with pytest.raises(pa.PlasmaObjectExists): + with pytest.raises(pa.plasma.PlasmaObjectExists): self.plasma_client.create_and_seal(object_id, b'a', b'b') # Make sure that these objects can be evicted. @@ -852,7 +852,7 @@ def test_use_full_memory(self): for _ in range(2): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) # Verify that an object that is too large does not fit. - with pytest.raises(pa.lib.PlasmaStoreFull): + with pytest.raises(pa.plasma.PlasmaStoreFull): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0)