diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc index 452d8dd0ac081..f1a7eab8fcbda 100644 --- a/cpp/src/arrow/python/deserialize.cc +++ b/cpp/src/arrow/python/deserialize.cc @@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu ipc::Message message(metadata, body); - RETURN_NOT_OK(ReadTensor(message, &tensor)); + RETURN_NOT_OK(ipc::ReadTensor(message, &tensor)); out->tensors.emplace_back(std::move(tensor)); } @@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu ipc::Message message(metadata, body); - RETURN_NOT_OK(ReadTensor(message, &tensor)); + RETURN_NOT_OK(ipc::ReadTensor(message, &tensor)); out->ndarrays.emplace_back(std::move(tensor)); } @@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu return Status::OK(); } -Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr* out) { - if (object.tensors.size() != 1) { - return Status::Invalid("Object is not a Tensor"); +Status DeserializeNdarray(const SerializedPyObject& object, + std::shared_ptr* out) { + if (object.ndarrays.size() != 1) { + return Status::Invalid("Object is not an Ndarray"); } - *out = object.tensors[0]; + *out = object.ndarrays[0]; return Status::OK(); } -Status ReadTensor(std::shared_ptr src, std::shared_ptr* out) { +Status NdarrayFromBuffer(std::shared_ptr src, std::shared_ptr* out) { io::BufferReader reader(src); SerializedPyObject object; RETURN_NOT_OK(ReadSerializedObject(&reader, &object)); - return DeserializeTensor(object, out); + return DeserializeNdarray(object, out); } } // namespace py diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h index a0286b17261e0..754765a6825fd 100644 --- a/cpp/src/arrow/python/deserialize.h +++ b/cpp/src/arrow/python/deserialize.h @@ -76,15 +76,15 @@ ARROW_EXPORT Status DeserializeObject(PyObject* context, const SerializedPyObject& object, PyObject* base, PyObject** out); -/// \brief Reconstruct Tensor from Arrow-serialized representation +/// \brief Reconstruct Ndarray from Arrow-serialized representation /// \param[in] object Object to deserialize /// \param[out] out The deserialized tensor /// \return Status ARROW_EXPORT -Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr* out); +Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr* out); ARROW_EXPORT -Status ReadTensor(std::shared_ptr src, std::shared_ptr* out); +Status NdarrayFromBuffer(std::shared_ptr src, std::shared_ptr* out); } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc index 2655280067f95..7911557ee73e0 100644 --- a/cpp/src/arrow/python/serialize.cc +++ b/cpp/src/arrow/python/serialize.cc @@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject return Status::OK(); } -Status SerializeTensor(std::shared_ptr tensor, SerializedPyObject* out) { +Status SerializeNdarray(std::shared_ptr tensor, SerializedPyObject* out) { std::shared_ptr array; SequenceBuilder builder; - RETURN_NOT_OK(builder.AppendTensor(static_cast(out->tensors.size()))); - out->tensors.push_back(tensor); + RETURN_NOT_OK(builder.AppendNdarray(static_cast(out->ndarrays.size()))); + out->ndarrays.push_back(tensor); RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array)); out->batch = MakeBatch(array); return Status::OK(); } -Status WriteTensorHeader(std::shared_ptr dtype, - const std::vector& shape, int64_t tensor_num_bytes, - io::OutputStream* dst) { +Status WriteNdarrayHeader(std::shared_ptr dtype, + const std::vector& shape, int64_t tensor_num_bytes, + io::OutputStream* dst) { auto empty_tensor = std::make_shared( dtype, std::make_shared(nullptr, tensor_num_bytes), shape); SerializedPyObject serialized_tensor; - RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor)); + RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor)); return serialized_tensor.WriteTo(dst); } diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h index f11cb5a1bf1ed..2759d0c9f1fb5 100644 --- a/cpp/src/arrow/python/serialize.h +++ b/cpp/src/arrow/python/serialize.h @@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr tensor, py::SerializedPyObject* o /// \param[in] dst The OutputStream to write the Tensor header to /// \return Status ARROW_EXPORT -Status WriteTensorHeader(std::shared_ptr dtype, - const std::vector& shape, int64_t tensor_num_bytes, - io::OutputStream* dst); +Status WriteNdarrayHeader(std::shared_ptr dtype, + const std::vector& shape, int64_t tensor_num_bytes, + io::OutputStream* dst); } // namespace py diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc index 15ae0dc69b764..a341d5a53988f 100644 --- a/python/pyarrow/tensorflow/plasma_op.cc +++ b/python/pyarrow/tensorflow/plasma_op.cc @@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel { if (!connected_) { VLOG(1) << "Connecting to Plasma..."; ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_, - plasma_manager_socket_name_, - plasma::kPlasmaDefaultReleaseDelay)); + plasma_manager_socket_name_, 0)); VLOG(1) << "Connected!"; connected_ = true; } @@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel { std::vector shape = {total_bytes / byte_width}; arrow::io::MockOutputStream mock; - ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock)); + ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock)); int64_t header_size = mock.GetExtentBytesWritten(); std::shared_ptr data_buffer; @@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel { int64_t offset; arrow::io::FixedSizeBufferWriter buf(data_buffer); - ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf)); + ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf)); ARROW_CHECK_OK(buf.Tell(&offset)); uint8_t* data = reinterpret_cast(data_buffer->mutable_data() + offset); - auto wrapped_callback = [this, context, done, data_buffer, object_id]() { + auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() { { tf::mutex_lock lock(mu_); ARROW_CHECK_OK(client_.Seal(object_id)); + ARROW_CHECK_OK(client_.Release(object_id)); +#ifdef GOOGLE_CUDA + auto orig_stream = context->op_device_context()->stream(); + auto stream_executor = orig_stream->parent(); + CHECK(stream_executor->HostMemoryUnregister(static_cast(data))); +#endif } context->SetStatus(tensorflow::Status::OK()); done(); @@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel { if (!connected_) { VLOG(1) << "Connecting to Plasma..."; ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_, - plasma_manager_socket_name_, - plasma::kPlasmaDefaultReleaseDelay)); + plasma_manager_socket_name_, 0)); VLOG(1) << "Connected!"; connected_ = true; } @@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel { /*timeout_ms=*/-1, &object_buffer)); } - std::shared_ptr tensor; - ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor)); + std::shared_ptr ndarray; + ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray)); - int64_t byte_width = get_byte_width(*tensor->type()); - const int64_t size_in_bytes = tensor->data()->size(); + int64_t byte_width = get_byte_width(*ndarray->type()); + const int64_t size_in_bytes = ndarray->data()->size(); tf::TensorShape shape({static_cast(size_in_bytes / byte_width)}); - const float* plasma_data = reinterpret_cast(tensor->raw_data()); + const float* plasma_data = reinterpret_cast(ndarray->raw_data()); tf::Tensor* output_tensor = nullptr; OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor), done); + auto wrapped_callback = [this, context, done, plasma_data, object_id]() { + { + tf::mutex_lock lock(mu_); + ARROW_CHECK_OK(client_.Release(object_id)); +#ifdef GOOGLE_CUDA + auto orig_stream = context->op_device_context()->stream(); + auto stream_executor = orig_stream->parent(); + CHECK(stream_executor->HostMemoryUnregister( + const_cast(static_cast(plasma_data)))); +#endif + } + done(); + }; + if (std::is_same::value) { std::memcpy( reinterpret_cast(const_cast(output_tensor->tensor_data().data())), plasma_data, size_in_bytes); - done(); + wrapped_callback(); } else { #ifdef GOOGLE_CUDA auto orig_stream = context->op_device_context()->stream(); @@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel { } // Important. See note in T2P op. - // We don't check the return status since the host memory might've been - // already registered (e.g., the TensorToPlasmaOp might've been run). CHECK(stream_executor->HostMemoryRegister( const_cast(static_cast(plasma_data)), static_cast(size_in_bytes))); @@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel { CHECK(orig_stream->ThenWaitFor(h2d_stream).ok()); context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute( - h2d_stream, std::move(done)); + h2d_stream, std::move(wrapped_callback)); #endif } } diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py index b7e1afa8833ae..d9bf915d663aa 100644 --- a/python/pyarrow/tests/test_plasma_tf_op.py +++ b/python/pyarrow/tests/test_plasma_tf_op.py @@ -70,7 +70,6 @@ def FromPlasma(): # Try getting the data from Python plasma_object_id = plasma.ObjectID(object_id) obj = client.get(plasma_object_id) - obj = obj.to_numpy() # Deserialized Tensor should be 64-byte aligned. assert obj.ctypes.data % 64 == 0 @@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False): np.int8, np.int16, np.int32, np.int64]: run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name, client, use_gpu, dtype) + + # Make sure the objects have been released. + for _, info in client.list().items(): + assert info['ref_count'] == 0