-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-4626: [Flight] Add application-defined metadata to DoGet/DoPut #4282
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! Here are some comments on the C++/Python side.
.. specific language governing permissions and limitations | ||
.. under the License. | ||
|
||
Arrow Flight RPC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doc additions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course! We have teams that wanted fuller docs, so hope this is a reasonable starting point.
I'll try to leave some comments on this when I can -- I have been heads down on ARROW-3144 and haven't been doing code reviews for the last week+ |
@wesm no worries, this is also going to need rebasing once your PR lands. |
I am going to close this until #4047 is merged just to keep the PR queue smaller, as this will need a bunch of rebasing. |
Reopening now that #4047 is merged. This also does half of https://issues.apache.org/jira/browse/ARROW-5143, enabling integration testing of (non-nested) dictionaries in Flight. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed a couple more things. Are you waiting for a Java review as well?
Thanks for the feedback! Rebased & made fixes. @pitrou yes, I'm waiting for a Java review too. |
There looks to be some spurious CI failures, especially on Windows. I'll try one last time to get those to succeed... |
MinGW builds are unfortunately broken currently. |
Flight cancellation tests look flaky, disabling those in CI. |
Okay, unit tests finally pass 😄 Just waiting on Java review, I believe. |
Thanks @lidavidm! I reached out to @jacques-n to see when he can spend some time reviewing the Java side of this |
cpp/src/arrow/flight/client.cc
Outdated
std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream, | ||
std::unique_ptr<GrpcStreamReader>* out); | ||
std::shared_ptr<Schema> schema() const override; | ||
Status ReadNext(std::shared_ptr<RecordBatch>* out) override; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to phone in from the peanut gallery, but what do you think about introducing a result struct like
struct FlightStreamChunk {
std::shared_ptr<RecordBatch> data;
std::shared_ptr<Buffer> app_metadata;
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that's a little more convenient for callers. We can't change the signature of ReadNext
here though, only ReadWithMetadata
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, maybe it's not so important, so long as in Python it still exposes the utility methods people expect (we find it useful to do do_get().read_pandas()
as that's by far the common case). If we could expose both iterator-of-record-batch and iterator-of-chunk that would be best (as not everyone will want the metadata).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the C++ side having a intended API endpoint would probably the easiest thing (since the metadata can just be ignored if the developer-user doesn't care), and we can make the Python API as ergonomic as desired (I presume -- it is our intent in fact -- that most consumers of the Python API are going to be sitting behind a front-end interface to pyarrow anyway)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I went ahead and changed it to use that interface; when the other PR is merged, we can then make it actually implement Iterator
.
java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java
Outdated
Show resolved
Hide resolved
java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java
Outdated
Show resolved
Hide resolved
private volatile VectorLoader loader; | ||
private volatile Throwable ex; | ||
private volatile FlightDescriptor descriptor; | ||
private volatile Schema schema; | ||
private volatile byte[] applicationMetadata = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's construct as a listener pattern instead. Assuming people only want the last one seems a bit specific to what must be your use case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I initially thought it would make correlating data with the associated metadata hard (and in the protocol, they're sent together - the metadata is not out-of-band for the cases that FlightStream represents), but the Java APIs are not really designed around manipulating individual RecordBatches anyways, so a listener would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jacques-n on second thought, I don't think a listener is appropriate - it complicates usage quite a bit, especially if you want to correlate a particular metadatum with a particular RecordBatch. The current code does guarantee that applicationMetadata
is not updated except by a call to next
, which is the same invariant as for the VectorSchemaRoot
contained in FlightStream
. So if you do want previous metadata messages, you can retain them yourself (as you would with a listener anyways).
Additionally, API design-wise, it's hard to decide how FlightProducer#acceptPut
should register a listener with FlightStream
.
java/flight/src/main/java/org/apache/arrow/flight/NoOpFlightProducer.java
Show resolved
Hide resolved
java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
Outdated
Show resolved
Hide resolved
.../flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java
Outdated
Show resolved
Hide resolved
java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
Outdated
Show resolved
Hide resolved
Side note, I will bump the Netty dependency version...I think this is the source of Netty failing to find methods at runtime (since we specified a Netty version that conflicted with gRPC's). I'm also going to try re-enabling some of the Flight Java tests, after tracking down what I believe are some memory allocation issues. I think we might want to look at the APIs and make it explicit what is owned where and when, though. |
Integration tests are broken, will investigate... |
Ok, all that should be left is rebasing on #4553. |
There's still a flake looks like. Do you want @jacques-n to have another look before we merge? |
That's what I get for forgetting flake8... I would appreciate any further thoughts Jacques has on the metadata API in Java. I'm not convinced on the callback-based API, as it makes the control flow complicated, especially when mixed with the blocking API for reading data. |
Codecov Report
@@ Coverage Diff @@
## master #4282 +/- ##
===========================================
- Coverage 88.57% 75.31% -13.26%
===========================================
Files 860 56 -804
Lines 108022 3192 -104830
Branches 1253 0 -1253
===========================================
- Hits 95678 2404 -93274
+ Misses 12065 788 -11277
+ Partials 279 0 -279 Continue to review full report at Codecov.
|
Ok, tried to fix the last Python tests...looks like gRPC error messages change text across versions (of course), so once #4484 lands, let's convert those tests to instead provide/check the gRPC error code in some way. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple small comments, fine to be addressed in a follow-up
Overall, +1
|
||
private final CompletableFuture<Void> result; | ||
private final BufferAllocator allocator; | ||
private final StreamListener<PutResult> listener; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove result here given the addition of the listener and just make a CompleteFuture based observer that implements listener (composed rather than all together)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My ask was slightly different. I was proposing that it doesn't make sense to have a completeable future and a listener in the same class. I'm struggling with the pattern where you're doing a double call in the on* event handling calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry - to be clear, you'd like to essentially merge PutObserver
and SetStreamListener
? That sounds reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to have listener > future > listener. Why not just stack the two listeners and the second one can do the future result (if it wants to go from listener > future)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the issue stems from trying to expose the result in two places; asynchronously via an optional listener given when starting DoPut, and through a future accessible on the listener used to write data during a DoPut. Maybe instead of the optional user-supplied listener defaulting to a no-op one, we can provide an easy way to supply one that converts the result to a future. That is, the current API looks like
final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, new StreamListener<>() {
// ....
@Override
public void onCompleted() {
}
});
// call writer.putNext...
writer.getResult(); // implicitly, we have to complete this future, while providing results to the listener above
and there is duplication in receiving the events from the server, instead we could have
final FutureListener listener = new FutureListener(); // {
// optionally override onNext to get application metadata
// };
final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener);
// call writer.putNext()...
listener.getResult();
Is that cleaner? There's a bit more boilerplate, but we could get rid of that if we require the listener to be a FutureListener, create a default instance if the client-supplied listener is null, and have writer.getResult()
just pass through to listener.getResult()
.
@@ -99,6 +118,9 @@ public void close() throws Exception { | |||
.collect(Collectors.toList()); | |||
|
|||
AutoCloseables.close(Iterables.concat(closeables, ImmutableList.of(root.get()))); | |||
if (applicationMetadata != null) { | |||
applicationMetadata.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add this reference to auto closeables above to ensure it is closed even if another closeable throws (also don't need the null check there)
applicationMetadata = metadata; | ||
} | ||
|
||
/** Create a PutResult with application-specific metadata. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably good to state reference counting behavior in this kind of method.
Rebased & addressed the last bit of feedback. |
I think that makes sense. Providing a simple listener impl for returning
the items via sync methods seems like a cleaner approach.
…On Sat, Jun 22, 2019, 6:02 AM lihalite ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java
<#4282 (comment)>:
> return stream;
}
- private static class SetStreamObserver<T> implements StreamObserver<T> {
- private final SettableFuture<T> result = SettableFuture.create();
- private volatile T resultLocal;
+ private static class SetStreamObserver implements StreamObserver<Flight.PutResult> {
+
+ private final CompletableFuture<Void> result;
+ private final BufferAllocator allocator;
+ private final StreamListener<PutResult> listener;
I guess the issue stems from trying to expose the result in two places;
asynchronously via an optional listener given when starting DoPut, and
through a future accessible on the listener used to write data during a
DoPut. Maybe instead of the optional user-supplied listener defaulting to a
no-op one, we can provide an easy way to supply one that converts the
result to a future. That is, the current API looks like
final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, new StreamListener<>() {
// ....
@OverRide
public void onCompleted() {
}
});// call writer.putNext...
writer.getResult(); // implicitly, we have to complete this future, while providing results to the listener above
and there is duplication in receiving the events from the server, instead
we could have
final FutureListener listener = new FutureListener(); // {
// optionally override onNext to get application metadata// };final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener);// call writer.putNext()...
listener.getResult();
Is that cleaner? There's a bit more boilerplate, but we could get rid of
that if we *require* the listener to be a FutureListener, create a
default instance if the client-supplied listener is null, and have
writer.getResult() just pass through to listener.getResult()`.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#4282?email_source=notifications&email_token=AABMYNVQTHDVFLDBJ24CJ4TP3YPHNA5CNFSM4HL2TJIKYY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOB4K6U7Y#discussion_r296445157>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABMYNQ7GT4NMWNCDWETN73P3YPHNANCNFSM4HL2TJIA>
.
|
Rebased and implemented a sync metadata interface as per @jacques-n's suggestions. Example: arrow/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java Line 117 in f302c79
|
Thanks @lihalite! Lgtm. +1 on the java side. |
Build failing to due to lint issues and failure in the integration tests. I'll be happy to merge once the tests are green |
No worries, should've rebased originally... Personal build: https://travis-ci.com/lihalite/arrow/builds/116979112 |
I wonder why there's no AppVeyor build. Do you have one on your fork? |
I manually triggered one: https://ci.appveyor.com/project/lihalite/arrow/builds/25560414
|
Ah, indeed. Ideally this clause would have examined all files in the PR, not only in the last commit, though. |
Appveyor just doesn't trigger about 5-10% of the time in my experience, probably a bug on their end |
Here's the running Appveyor build: https://ci.appveyor.com/project/lihalite/arrow/builds/25560411. Will probably wait a bit for a couple more of the main builds to run |
Looks like tests finally pass, except R on AppVeyor (which I just kicked again). |
The R failure is unrelated: https://downforeveryoneorjustme.com/cloud.r-project.org |
Merging, thanks @lihalite and code reviewers! |
Thanks everyone, this has been quite the journey! |
@lihalite would you like to start a section about Flight-specific improvements in 0.14 in https://docs.google.com/document/d/1ljkW5tBh7cDfPRg_z6YY-1cbXfXxpfpRUlSatbV8128/edit?usp=sharing? |
Sure, I'll take a look when I get a chance. |
Also covers ARROW-4627.
This is quite an enormous change, if preferred, I can do my best to try and separate changes.