From 8fd99cd9bfb143bf5deb9aa74b83a032a0eed5b9 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 21 Jun 2019 15:53:15 -0400 Subject: [PATCH] Inline CompletableFuture in Flight acceptPut --- .../org/apache/arrow/flight/FlightClient.java | 19 ++++++------------- .../org/apache/arrow/flight/FlightStream.java | 8 ++++---- .../org/apache/arrow/flight/PutResult.java | 14 ++++++++++++-- .../apache/arrow/flight/TestLargeMessage.java | 5 ++--- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java index 2b5aaea52b096..673e4d78ef010 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java @@ -194,7 +194,7 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext); return new PutObserver(new VectorUnloader( root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */), - observer, resultObserver.getFuture()); + observer, resultObserver); } /** @@ -257,17 +257,14 @@ public void onCompleted() { return stream; } - private static class SetStreamObserver implements StreamObserver { - - private final CompletableFuture result; + private static class SetStreamObserver extends CompletableFuture implements StreamObserver { private final BufferAllocator allocator; private final StreamListener listener; - SetStreamObserver(BufferAllocator allocator, - StreamListener listener) { + SetStreamObserver(BufferAllocator allocator, StreamListener listener) { + super(); this.allocator = allocator; this.listener = listener == null ? NoOpStreamListener.getInstance() : listener; - result = new CompletableFuture<>(); } @Override @@ -279,18 +276,14 @@ public void onNext(Flight.PutResult value) { @Override public void onError(Throwable t) { - result.completeExceptionally(t); listener.onError(t); + completeExceptionally(t); } @Override public void onCompleted() { listener.onCompleted(); - result.complete(null); - } - - public CompletableFuture getFuture() { - return result; + complete(null); } } diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightStream.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightStream.java index 5cf0025970168..010ff330a2c72 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightStream.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightStream.java @@ -117,10 +117,10 @@ public void close() throws Exception { .map(t -> ((AutoCloseable) t)) .collect(Collectors.toList()); - AutoCloseables.close(Iterables.concat(closeables, ImmutableList.of(root.get()))); - if (applicationMetadata != null) { - applicationMetadata.close(); - } + // Must check for null since ImmutableList doesn't accept nulls + AutoCloseables.close(Iterables.concat(closeables, + applicationMetadata != null ? ImmutableList.of(root.get(), applicationMetadata) + : ImmutableList.of(root.get()))); } /** diff --git a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java index b87523879ff14..11848690d0732 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java @@ -19,6 +19,7 @@ import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.ReferenceManager; import com.google.protobuf.ByteString; @@ -37,7 +38,11 @@ private PutResult(ArrowBuf metadata) { applicationMetadata = metadata; } - /** Create a PutResult with application-specific metadata. */ + /** + * Create a PutResult with application-specific metadata. + * + *

This method assumes ownership of the {@link ArrowBuf}. + */ public static PutResult metadata(ArrowBuf metadata) { if (metadata == null) { return empty(); @@ -50,7 +55,12 @@ public static PutResult empty() { return new PutResult(null); } - /** Get the metadata in this message. May be null. */ + /** + * Get the metadata in this message. May be null. + * + *

Ownership of the {@link ArrowBuf} is retained by this object. Call {@link ReferenceManager#retain()} to preserve + * a reference. + */ public ArrowBuf getApplicationMetadata() { return applicationMetadata; } diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java b/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java index 70ba9a4100e1a..c00b9ecbf477d 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java @@ -44,8 +44,8 @@ public void getLargeMessage() throws Exception { FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build())) { try (FlightClient client = FlightClient.builder(a, s.getLocation()).build()) { - FlightStream stream = client.getStream(new Ticket(new byte[]{})); - try (VectorSchemaRoot root = stream.getRoot()) { + try (FlightStream stream = client.getStream(new Ticket(new byte[]{})); + VectorSchemaRoot root = stream.getRoot()) { while (stream.next()) { for (final Field field : root.getSchema().getFields()) { int value = 0; @@ -57,7 +57,6 @@ public void getLargeMessage() throws Exception { } } } - stream.close(); } } }