Skip to content

Commit

Permalink
Factor out FlightData->Message conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
David Li authored and David Li committed Feb 5, 2019
1 parent 65d6ba2 commit 562b861
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
16 changes: 16 additions & 0 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket) {
pb_ticket->set_ticket(ticket.ticket);
}

// FlightData

Status FromProto(const pb::FlightData& pb_data,
FlightDescriptor* descriptor,
std::unique_ptr<ipc::Message>* message) {
RETURN_NOT_OK(internal::FromProto(pb_data.flight_descriptor(), descriptor));
const std::string& header = pb_data.data_header();
const std::string& body = pb_data.data_body();
std::shared_ptr<Buffer> header_buf = Buffer::Wrap(header.data(), header.size());
std::shared_ptr<Buffer> body_buf = Buffer::Wrap(body.data(), body.size());
if (header_buf == nullptr || body_buf == nullptr) {
return Status::UnknownError("Could not create buffers from protobuf");
}
return ipc::Message::Open(header_buf, body_buf, message);
}

// FlightEndpoint

Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint) {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/flight/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ Status FromProto(const pb::Result& pb_result, Result* result);
Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria);
Status FromProto(const pb::Location& pb_location, Location* location);
Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket);
Status FromProto(const pb::FlightData& pb_data,
FlightDescriptor* descriptor,
std::unique_ptr<ipc::Message>* message);
Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info);
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,15 @@ class FlightServiceImpl : public FlightService::Service {
pb::FlightData data;
if (reader->Read(&data)) {
FlightDescriptor descriptor;
// Message only lives as long as data
std::unique_ptr<ipc::Message> message;
std::shared_ptr<Schema> schema;
GRPC_RETURN_NOT_OK(internal::FromProto(data.flight_descriptor(), &descriptor));
std::shared_ptr<Buffer> header_buf;
std::shared_ptr<Buffer> body_buf;
GRPC_RETURN_NOT_OK(Buffer::FromString(data.data_header(), &header_buf));
GRPC_RETURN_NOT_OK(Buffer::FromString(data.data_body(), &body_buf));
GRPC_RETURN_NOT_OK(ipc::Message::Open(header_buf, body_buf, &message));
GRPC_RETURN_NOT_OK(internal::FromProto(data, &descriptor, &message));

if (!message || message->type() != ipc::Message::Type::SCHEMA) {
return internal::ToGrpcStatus(
Status(StatusCode::Invalid, "DoPut must start with schema/descriptor"));
} else {
std::shared_ptr<Schema> schema;
GRPC_RETURN_NOT_OK(ipc::ReadSchema(*message, &schema));

auto message_reader = std::unique_ptr<FlightMessageReader>(
Expand Down

0 comments on commit 562b861

Please sign in to comment.