Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3289: [C++] Implement Flight DoPut #3524

Closed
wants to merge 20 commits into from
Closed

ARROW-3289: [C++] Implement Flight DoPut #3524

wants to merge 20 commits into from

Conversation

ghost
Copy link

@ghost ghost commented Jan 29, 2019

Implements server/client side DoPut in C++ and extends the integration tests to exercise this.

We may want a different API for client-side DoPut that exposes any potential server response; I made it give the client a RecordBatchWriter to be symmetric with DoGet for now though.

@wesm
Copy link
Member

wesm commented Feb 1, 2019

I am headed out of town tomorrow so I may not be able to get to this until Monday.

@ghost
Copy link
Author

ghost commented Feb 1, 2019

No worries! I have Python client bindings in the pipeline, might take a look at server as well in the meantime.

Copy link
Member

@wesm wesm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great. I left a number of comments, small things mostly

@jacques-n can you take a glance at the Java? It looked reasonable at first glance but I don't know the async stuff well enough to review

I am wondering out loud how we want to handle asynchronous RPC in C++. I can see an argument for maintaining a set of synchronous APIs, but we might want to implement versions like

AsyncResult<shared_ptr<RecordBatch>> ReadNext();

Then AsyncResult has both a Status (to return failures) and the returned Status (for errors). cc @pitrou in case any thoughts about this. We really haven't done much async in this project yet so it may require some time to think through what is a good approach.

@@ -227,7 +77,9 @@ class FlightStreamReader : public RecordBatchReader {
// For customizing read path for better memory/serialization efficiency
auto custom_reader = reinterpret_cast<grpc::ClientReader<FlightData>*>(stream_.get());

if (custom_reader->Read(&data)) {
// Explicitly specify the override to invoke - otherwise compiler
// may invoke through vtable (not updated by reinterpret_cast)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting. Which compiler did you observe this on? I didn't think this was a virtual method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ clang++ --version
Apple LLVM version 9.0.0 (clang-900.0.39.2)
Target: x86_64-apple-darwin16.7.0
Thread model: posix
InstalledDir: /Library/Developer/CommandLineTools/usr/bin

It's inconsistent - the calls you originally had were done statically, but when I added another instance for DoPut, Clang opted for a virtual call, judging from the disassembly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really important to avoid a virtual call here? We're probably reading a non-trivial amount of data. If gRPC defined that method as virtual, we should probably not devirtualize it like.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is a correctness issue; the reader is a generic class instantiated for the Protobuf-generated struct, and uses the Protobuf serializer/deserializer. We'd like to force it to instead user our serializer. The original implementation relied on using reinterpret_cast to change the template parameter, and then hoping the compiler generated the correct static call. I found that the compiler did not always do this, and would fall back to the Protobuf serialization path. This change is intended to force it not to do this.

This seems like undefined behavior to me, and it might be worthwhile to find a better way, but comments by Wes in the code indicate that gRPC doesn't expose a good way to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... What's the difference between pb::FlightData and FlightData?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or if that happens, it means the reinterpret_cast is just illegal :-(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I think it's undefined behavior - it's not a legal reinterpret_cast.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pb::FlightData is the Protobuf-generated struct, FlightData is our own struct (in serialization-internal.h)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add a comment warning about the undefined behavior - so that this doesn't get lost then rediscovered.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment before the four uses of reinterpret_cast.

cpp/src/arrow/flight/client.cc Outdated Show resolved Hide resolved
cpp/src/arrow/flight/client.cc Outdated Show resolved Hide resolved
cpp/src/arrow/flight/client.cc Outdated Show resolved Hide resolved
writer_ = std::move(writer);
}

// TODO: there isn't a way to access this as a user.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I was reading this I was thinking about whether returning arrow::ipc::RecordBatchWriter is the best thing. If we need other kinds of methods related to acks or RPC-specific details, it might be worth defining an Flight-specific class that is returned from these methods

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was debating this as well. It should at least be easy to wrap a custom interface in RecordBatchWriter, so I guess it's better to leave this open for future extensions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I turned this into a pImpl, the wrapper doesn't expose this still, but leaves it open for the future. The wrapper is still a RecordBatchWriter; should it be further decoupled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think we can keep it as a RecordBatchWriter, especially as the base class is just an interface.
Furthermore, we probably shouldn't consider the Flight API stable until it's battle-tested.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed it back to RecordBatchWriter.

// message sent, and message failed to deserialize. IMO, we
// should add logging around the Status returns in
// serialization-internal.h to make debugging such cases easier.
return grpc::Status::OK;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, though in this particular case the SerializationTraits customization isn't being invoked unless I misread. Does gRPC provide any way to distinguish deserialization failures in its out-of-the-box protobuf API?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I changed it at some point, but I did run into the problem described before. I'll take a closer look at their APIs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where they use SerializationTraits:

https://github.com/grpc/grpc/blob/15bb25ae54c98c1a653281e18883487fd95d2bf0/include/grpcpp/impl/codegen/call_op_set.h#L458-L460

Not getting a message is the same as getting an invalid message, so there's no way for us to tell from the API. I'll add logging statements.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can you explain why you return OK here? If the client doesn't send a first message, then probably it should be considered an IPC failure?
(for example, the client might have called WritesDone but still be available for receiving a response... am I misunderstanding this?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's fair. I'll change it to gRPC INVALID_ARGUMENT.

cpp/src/arrow/flight/test-integration-client.cc Outdated Show resolved Hide resolved
cpp/src/arrow/flight/test-integration-server.cc Outdated Show resolved Hide resolved
cpp/src/arrow/flight/test-integration-server.cc Outdated Show resolved Hide resolved
@wesm
Copy link
Member

wesm commented Feb 4, 2019

Any more work you want to do here or should I review / merge this?

'-DgRPC_ZLIB_PROVIDER=package'
'-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}'
'-DCMAKE_C_FLAGS=${EP_C_FLAGS}'
'-DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I wonder if this works on Windows.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that might not. On the other hand, on MacOS the double quotes got included in gRPC's path (so it tried to find /path/to/grpc").

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These single quotes break the EP build for me on Ubuntu 14.04. I'll try removing the quotes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing them seems OK. I'll test on Windows soon

@ghost
Copy link
Author

ghost commented Feb 4, 2019

I think this is ready, if you're happy with FlightPutWriter to implement DoPut.

@pitrou
Copy link
Member

pitrou commented Feb 4, 2019

@wesm I'll review this as well.

@wesm
Copy link
Member

wesm commented Feb 4, 2019

OK, I will defer to you since you're getting up to speed on Flight and I already reviewed this

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some comments and questions which might mostly be due to me not fully understanding this yet :-)

@@ -227,7 +77,9 @@ class FlightStreamReader : public RecordBatchReader {
// For customizing read path for better memory/serialization efficiency
auto custom_reader = reinterpret_cast<grpc::ClientReader<FlightData>*>(stream_.get());

if (custom_reader->Read(&data)) {
// Explicitly specify the override to invoke - otherwise compiler
// may invoke through vtable (not updated by reinterpret_cast)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really important to avoid a virtual call here? We're probably reading a non-trivial amount of data. If gRPC defined that method as virtual, we should probably not devirtualize it like.

@@ -211,7 +211,8 @@ public PutObserver(VectorUnloader unloader, ClientCallStreamObserver<ArrowMessag
@Override
public void putNext() {
ArrowRecordBatch batch = unloader.getRecordBatch();
while (!observer.isReady()) {
// Check the futureResult in case server sent an exception
while (!observer.isReady() && !futureResult.isDone()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: is it customary to do such busy wait in Java?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a callback available in this case, though it would complicate the code quite a bit. Let me take a stab at it...

cpp/src/arrow/ipc/json.h Outdated Show resolved Hide resolved
writer_ = std::move(writer);
}

// TODO: there isn't a way to access this as a user.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I think we can keep it as a RecordBatchWriter, especially as the base class is just an interface.
Furthermore, we probably shouldn't consider the Flight API stable until it's battle-tested.

cpp/src/arrow/flight/client.cc Outdated Show resolved Hide resolved
cpp/src/arrow/flight/client.cc Show resolved Hide resolved
cpp/src/arrow/flight/serialization-internal.h Outdated Show resolved Hide resolved
// message sent, and message failed to deserialize. IMO, we
// should add logging around the Status returns in
// serialization-internal.h to make debugging such cases easier.
return grpc::Status::OK;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can you explain why you return OK here? If the client doesn't send a first message, then probably it should be considered an IPC failure?
(for example, the client might have called WritesDone but still be available for receiving a response... am I misunderstanding this?)

@pitrou
Copy link
Member

pitrou commented Feb 4, 2019

@wesm

I am wondering out loud how we want to handle asynchronous RPC in C++. I can see an argument for maintaining a set of synchronous APIs, but we might want to implement versions like

AsyncResult<shared_ptr<RecordBatch>> ReadNext();

Then AsyncResult has both a Status (to return failures) and the returned Status (for errors).

I'm not sure why the two status codes. Also, instead of having our own AsyncResult, we could perhaps just use std::future.

@wesm
Copy link
Member

wesm commented Feb 4, 2019

we could perhaps just use std::future

It's outside of the scope of this discussion but you need a mechanism to encapsulate both multiple return values as well as the error (if any). So you could do something like

std::future<Result<T>> where Result<T, ...> is an object that can capture multiple output types and a Status. You can also use std::pair<Status, T> if you want of course. We'll want to do async before too long to make sure we are keeping the network as busy as possible, so we can resolve it then

@ghost
Copy link
Author

ghost commented Feb 5, 2019

I believe I've addressed everything now, except for the Java busy-wait and the CMake windows build. Let's see how AppVeyor goes.

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

Thanks!

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

I've started an AppVeyor build on my account here: https://ci.appveyor.com/project/pitrou/arrow/builds/22141649

@codecov-io
Copy link

Codecov Report

Merging #3524 into master will increase coverage by 0.87%.
The diff coverage is 0%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #3524      +/-   ##
==========================================
+ Coverage    87.8%   88.68%   +0.87%     
==========================================
  Files         666      537     -129     
  Lines       82365    73404    -8961     
  Branches     1069        0    -1069     
==========================================
- Hits        72323    65098    -7225     
+ Misses       9927     8306    -1621     
+ Partials      115        0     -115
Impacted Files Coverage Δ
cpp/src/arrow/ipc/json.cc 89.85% <0%> (-8.56%) ⬇️
go/arrow/math/uint64_amd64.go
go/arrow/memory/memory_avx2_amd64.go
js/src/enum.ts
go/arrow/array/builder.go
js/src/Arrow.node.ts
js/src/schema.ts
go/arrow/type_traits_boolean.go
js/src/ipc/node/writer.ts
js/src/visitor/vectorloader.ts
... and 122 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e533a9e...13fb29a. Read the comment docs.

@ghost
Copy link
Author

ghost commented Feb 5, 2019

For some reason, I can't see the AppVeyor build anymore, though I was able to a little while ago.

For the Java blocking, do you mind if I investigate that in a follow up? I think it's a non-trivial change, especially as we might want to consider making putNext itself non-blocking (but still exposing some sort of flow control so that a server does not experience unbounded memory usage with a slow client).

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

I had to restart it because of a startup failure, so it got a new number:
https://ci.appveyor.com/project/pitrou/arrow/builds/22142596

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

For the Java blocking, do you mind if I investigate that in a follow up?

No, that's entirely fine.

@ghost
Copy link
Author

ghost commented Feb 5, 2019

Awesome, thank you. I'll file a JIRA for it so I don't lose track.

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

but still exposing some sort of flow control so that a server does not experience unbounded memory usage with a slow client

I would hope grpc helps to deal with that? The IO stack is where the send buffer size is known.

@ghost
Copy link
Author

ghost commented Feb 5, 2019

Heh...unfortunately gRPC-java will buffer outgoing messages unboundedly. You have to check isReady/wrangle callbacks yourself if you want to limit yourself.

@ghost
Copy link
Author

ghost commented Feb 5, 2019

If you're curious, I did dig it up in the source once, I could find it again...

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

A possible improvement would be to have a semi-busy wait, with progressively growing sleep times, to balance responsivity and CPU consumption.

@ghost
Copy link
Author

ghost commented Feb 5, 2019

I filed ARROW-4484. Backing off on the wait is probably a good compromise.

@pitrou
Copy link
Member

pitrou commented Feb 5, 2019

Build passed. I'm gonna merge. Thank you @lihalite !

@pitrou pitrou closed this in 4004b72 Feb 5, 2019
@ghost
Copy link
Author

ghost commented Feb 5, 2019

Awesome, thank you and @wesm for the review! I'll get the Python stuff up soon.

xhochy pushed a commit that referenced this pull request Feb 8, 2019
Implements server/client side DoPut in C++ and extends the integration tests to exercise this.

We may want a different API for client-side DoPut that exposes any potential server response; I made it give the client a RecordBatchWriter to be symmetric with DoGet for now though.

Author: David Li <[email protected]>

Closes #3524 from lihalite/arrow-3289 and squashes the following commits:

13fb29a <David Li> Document why VectorUnloader must align batches in Flight
f32c0b2 <David Li> Indicate error to client in DoPut if no message sent
cd56782 <David Li> Warn about undefined behavior in Flight source
1f816e8 <David Li> Move serialization helpers out of gRPC namespace
21b315a <David Li> Hide FlightPutWriter from public interface for now
6edf2e2 <David Li> Introduce FlightPutWriter
58d6936 <David Li> Enable building with non-CMake c-ares
cfa4ca5 <David Li> Properly quote arguments to gRPC CMake build
302dd33 <David Li> Explicitly link Protobuf for Flight
419ad68 <David Li> Log (de)serialization failures in Flight fast-path
562b861 <David Li> Factor out FlightData->Message conversion
65d6ba2 <David Li> Clean up C++ Flight integration client
3e185cb <David Li>  Add convenience to parse JSON from file
111b3e6 <David Li> Fix style/lint issues
3cb51ba <David Li> Test all returned locations in Flight integration tests
905ef38 <David Li> Implement C++ Flight DoPut
138141f <David Li> Fix FromProto for FlightDescriptor
a11a5ac <David Li> Don't hang in Flight DoPut if server sends exception
b3ac01a <David Li> Align RecordBatch on client side in Flight DoPut
846df73 <David Li> Implement put in Java Flight integration server
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants