From 562b86132a8e8e8464ad464b4cec1ec1c4c4474f Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 1 Feb 2019 15:06:35 -0500 Subject: [PATCH] Factor out FlightData->Message conversion --- cpp/src/arrow/flight/internal.cc | 16 ++++++++++++++++ cpp/src/arrow/flight/internal.h | 3 +++ cpp/src/arrow/flight/server.cc | 10 +++------- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc index 629796ea36850..b614dd5b3ffc0 100644 --- a/cpp/src/arrow/flight/internal.cc +++ b/cpp/src/arrow/flight/internal.cc @@ -131,6 +131,22 @@ void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket) { pb_ticket->set_ticket(ticket.ticket); } +// FlightData + +Status FromProto(const pb::FlightData& pb_data, + FlightDescriptor* descriptor, + std::unique_ptr* message) { + RETURN_NOT_OK(internal::FromProto(pb_data.flight_descriptor(), descriptor)); + const std::string& header = pb_data.data_header(); + const std::string& body = pb_data.data_body(); + std::shared_ptr header_buf = Buffer::Wrap(header.data(), header.size()); + std::shared_ptr body_buf = Buffer::Wrap(body.data(), body.size()); + if (header_buf == nullptr || body_buf == nullptr) { + return Status::UnknownError("Could not create buffers from protobuf"); + } + return ipc::Message::Open(header_buf, body_buf, message); +} + // FlightEndpoint Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint) { diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h index bae1eedfa9c66..a4bafd2693df9 100644 --- a/cpp/src/arrow/flight/internal.h +++ b/cpp/src/arrow/flight/internal.h @@ -57,6 +57,9 @@ Status FromProto(const pb::Result& pb_result, Result* result); Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria); Status FromProto(const pb::Location& pb_location, Location* location); Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket); +Status FromProto(const pb::FlightData& pb_data, + FlightDescriptor* descriptor, + std::unique_ptr* message); Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr); Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint); Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info); diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 1d5b215c8ead3..bd01e9ab7d9a8 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -213,19 +213,15 @@ class FlightServiceImpl : public FlightService::Service { pb::FlightData data; if (reader->Read(&data)) { FlightDescriptor descriptor; + // Message only lives as long as data std::unique_ptr message; - std::shared_ptr schema; - GRPC_RETURN_NOT_OK(internal::FromProto(data.flight_descriptor(), &descriptor)); - std::shared_ptr header_buf; - std::shared_ptr body_buf; - GRPC_RETURN_NOT_OK(Buffer::FromString(data.data_header(), &header_buf)); - GRPC_RETURN_NOT_OK(Buffer::FromString(data.data_body(), &body_buf)); - GRPC_RETURN_NOT_OK(ipc::Message::Open(header_buf, body_buf, &message)); + GRPC_RETURN_NOT_OK(internal::FromProto(data, &descriptor, &message)); if (!message || message->type() != ipc::Message::Type::SCHEMA) { return internal::ToGrpcStatus( Status(StatusCode::Invalid, "DoPut must start with schema/descriptor")); } else { + std::shared_ptr schema; GRPC_RETURN_NOT_OK(ipc::ReadSchema(*message, &schema)); auto message_reader = std::unique_ptr(