Skip to content

Commit

Permalink
ARROW-3920: [plasma] Fix reference counting in custom tensorflow plas…
Browse files Browse the repository at this point in the history
…ma operator.

There is an issue here where `Release` was never being called in the plasma TF operator.

Note that I also changed the release delay in the plasma operator to 0.

Author: Robert Nishihara <[email protected]>
Author: Philipp Moritz <[email protected]>

Closes apache#3061 from robertnishihara/extrareleaseinplasmaop and squashes the following commits:

c109566 <Philipp Moritz> add include guards
f89d5df <Philipp Moritz> lint
4836342 <Philipp Moritz> unregister memory
e3b3864 <Robert Nishihara> Linting
b948ce0 <Robert Nishihara> Add test.
75f2bd9 <Robert Nishihara> Remove logging statement.
f04a7d2 <Robert Nishihara> Fix
574c035 <Robert Nishihara> Fix ndarray/tensor confusion in plasma op.
06985cd <Robert Nishihara> Have plasma op deserialize as numpy array.
a2a9c36 <Robert Nishihara> Add release call into wrapped_callback.
0db9154 <Robert Nishihara> Change release delay to 0.
f434094 <Robert Nishihara> Add Release call in plasma op.
  • Loading branch information
robertnishihara authored and pcmoritz committed Dec 1, 2018
1 parent 2bc4d95 commit a667fca
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 38 deletions.
17 changes: 9 additions & 8 deletions cpp/src/arrow/python/deserialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->tensors.emplace_back(std::move(tensor));
}

Expand All @@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->ndarrays.emplace_back(std::move(tensor));
}

Expand All @@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
return Status::OK();
}

Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
if (object.tensors.size() != 1) {
return Status::Invalid("Object is not a Tensor");
Status DeserializeNdarray(const SerializedPyObject& object,
std::shared_ptr<Tensor>* out) {
if (object.ndarrays.size() != 1) {
return Status::Invalid("Object is not an Ndarray");
}
*out = object.tensors[0];
*out = object.ndarrays[0];
return Status::OK();
}

Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
io::BufferReader reader(src);
SerializedPyObject object;
RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
return DeserializeTensor(object, out);
return DeserializeNdarray(object, out);
}

} // namespace py
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/python/deserialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ ARROW_EXPORT
Status DeserializeObject(PyObject* context, const SerializedPyObject& object,
PyObject* base, PyObject** out);

/// \brief Reconstruct Tensor from Arrow-serialized representation
/// \brief Reconstruct Ndarray from Arrow-serialized representation
/// \param[in] object Object to deserialize
/// \param[out] out The deserialized tensor
/// \return Status
ARROW_EXPORT
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);

ARROW_EXPORT
Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);

} // namespace py
} // namespace arrow
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/python/serialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
return Status::OK();
}

Status SerializeTensor(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
std::shared_ptr<Array> array;
SequenceBuilder builder;
RETURN_NOT_OK(builder.AppendTensor(static_cast<int32_t>(out->tensors.size())));
out->tensors.push_back(tensor);
RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
out->ndarrays.push_back(tensor);
RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array));
out->batch = MakeBatch(array);
return Status::OK();
}

Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst) {
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst) {
auto empty_tensor = std::make_shared<Tensor>(
dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
SerializedPyObject serialized_tensor;
RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor));
RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
return serialized_tensor.WriteTo(dst);
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/python/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr<Tensor> tensor, py::SerializedPyObject* o
/// \param[in] dst The OutputStream to write the Tensor header to
/// \return Status
ARROW_EXPORT
Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst);
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst);

} // namespace py

Expand Down
48 changes: 32 additions & 16 deletions python/pyarrow/tensorflow/plasma_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
plasma_manager_socket_name_,
plasma::kPlasmaDefaultReleaseDelay));
plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
Expand Down Expand Up @@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
std::vector<int64_t> shape = {total_bytes / byte_width};

arrow::io::MockOutputStream mock;
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock));
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock));
int64_t header_size = mock.GetExtentBytesWritten();

std::shared_ptr<Buffer> data_buffer;
Expand All @@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {

int64_t offset;
arrow::io::FixedSizeBufferWriter buf(data_buffer);
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf));
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf));
ARROW_CHECK_OK(buf.Tell(&offset));

uint8_t* data = reinterpret_cast<uint8_t*>(data_buffer->mutable_data() + offset);

auto wrapped_callback = [this, context, done, data_buffer, object_id]() {
auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() {
{
tf::mutex_lock lock(mu_);
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
auto stream_executor = orig_stream->parent();
CHECK(stream_executor->HostMemoryUnregister(static_cast<void*>(data)));
#endif
}
context->SetStatus(tensorflow::Status::OK());
done();
Expand Down Expand Up @@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
plasma_manager_socket_name_,
plasma::kPlasmaDefaultReleaseDelay));
plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
Expand Down Expand Up @@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
/*timeout_ms=*/-1, &object_buffer));
}

std::shared_ptr<arrow::Tensor> tensor;
ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor));
std::shared_ptr<arrow::Tensor> ndarray;
ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray));

int64_t byte_width = get_byte_width(*tensor->type());
const int64_t size_in_bytes = tensor->data()->size();
int64_t byte_width = get_byte_width(*ndarray->type());
const int64_t size_in_bytes = ndarray->data()->size();

tf::TensorShape shape({static_cast<int64_t>(size_in_bytes / byte_width)});

const float* plasma_data = reinterpret_cast<const float*>(tensor->raw_data());
const float* plasma_data = reinterpret_cast<const float*>(ndarray->raw_data());

tf::Tensor* output_tensor = nullptr;
OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor),
done);

auto wrapped_callback = [this, context, done, plasma_data, object_id]() {
{
tf::mutex_lock lock(mu_);
ARROW_CHECK_OK(client_.Release(object_id));
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
auto stream_executor = orig_stream->parent();
CHECK(stream_executor->HostMemoryUnregister(
const_cast<void*>(static_cast<const void*>(plasma_data))));
#endif
}
done();
};

if (std::is_same<Device, CPUDevice>::value) {
std::memcpy(
reinterpret_cast<void*>(const_cast<char*>(output_tensor->tensor_data().data())),
plasma_data, size_in_bytes);
done();
wrapped_callback();
} else {
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
Expand All @@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
}

// Important. See note in T2P op.
// We don't check the return status since the host memory might've been
// already registered (e.g., the TensorToPlasmaOp might've been run).
CHECK(stream_executor->HostMemoryRegister(
const_cast<void*>(static_cast<const void*>(plasma_data)),
static_cast<tf::uint64>(size_in_bytes)));
Expand All @@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
CHECK(orig_stream->ThenWaitFor(h2d_stream).ok());

context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute(
h2d_stream, std::move(done));
h2d_stream, std::move(wrapped_callback));
#endif
}
}
Expand Down
5 changes: 4 additions & 1 deletion python/pyarrow/tests/test_plasma_tf_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def FromPlasma():
# Try getting the data from Python
plasma_object_id = plasma.ObjectID(object_id)
obj = client.get(plasma_object_id)
obj = obj.to_numpy()

# Deserialized Tensor should be 64-byte aligned.
assert obj.ctypes.data % 64 == 0
Expand Down Expand Up @@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False):
np.int8, np.int16, np.int32, np.int64]:
run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
client, use_gpu, dtype)

# Make sure the objects have been released.
for _, info in client.list().items():
assert info['ref_count'] == 0

0 comments on commit a667fca

Please sign in to comment.