Skip to content

Commit

Permalink
Inline CompletableFuture in Flight acceptPut
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jun 26, 2019
1 parent 4cebc54 commit 8fd99cd
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo
DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext);
return new PutObserver(new VectorUnloader(
root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */),
observer, resultObserver.getFuture());
observer, resultObserver);
}

/**
Expand Down Expand Up @@ -257,17 +257,14 @@ public void onCompleted() {
return stream;
}

private static class SetStreamObserver implements StreamObserver<Flight.PutResult> {

private final CompletableFuture<Void> result;
private static class SetStreamObserver extends CompletableFuture<Void> implements StreamObserver<Flight.PutResult> {
private final BufferAllocator allocator;
private final StreamListener<PutResult> listener;

SetStreamObserver(BufferAllocator allocator,
StreamListener<PutResult> listener) {
SetStreamObserver(BufferAllocator allocator, StreamListener<PutResult> listener) {
super();
this.allocator = allocator;
this.listener = listener == null ? NoOpStreamListener.getInstance() : listener;
result = new CompletableFuture<>();
}

@Override
Expand All @@ -279,18 +276,14 @@ public void onNext(Flight.PutResult value) {

@Override
public void onError(Throwable t) {
result.completeExceptionally(t);
listener.onError(t);
completeExceptionally(t);
}

@Override
public void onCompleted() {
listener.onCompleted();
result.complete(null);
}

public CompletableFuture<Void> getFuture() {
return result;
complete(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ public void close() throws Exception {
.map(t -> ((AutoCloseable) t))
.collect(Collectors.toList());

AutoCloseables.close(Iterables.concat(closeables, ImmutableList.of(root.get())));
if (applicationMetadata != null) {
applicationMetadata.close();
}
// Must check for null since ImmutableList doesn't accept nulls
AutoCloseables.close(Iterables.concat(closeables,
applicationMetadata != null ? ImmutableList.of(root.get(), applicationMetadata)
: ImmutableList.of(root.get())));
}

/**
Expand Down
14 changes: 12 additions & 2 deletions java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;

import com.google.protobuf.ByteString;

Expand All @@ -37,7 +38,11 @@ private PutResult(ArrowBuf metadata) {
applicationMetadata = metadata;
}

/** Create a PutResult with application-specific metadata. */
/**
* Create a PutResult with application-specific metadata.
*
* <p>This method assumes ownership of the {@link ArrowBuf}.
*/
public static PutResult metadata(ArrowBuf metadata) {
if (metadata == null) {
return empty();
Expand All @@ -50,7 +55,12 @@ public static PutResult empty() {
return new PutResult(null);
}

/** Get the metadata in this message. May be null. */
/**
* Get the metadata in this message. May be null.
*
* <p>Ownership of the {@link ArrowBuf} is retained by this object. Call {@link ReferenceManager#retain()} to preserve
* a reference.
*/
public ArrowBuf getApplicationMetadata() {
return applicationMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public void getLargeMessage() throws Exception {
FlightTestUtil.getStartedServer((location) -> FlightServer.builder(a, location, producer).build())) {

try (FlightClient client = FlightClient.builder(a, s.getLocation()).build()) {
FlightStream stream = client.getStream(new Ticket(new byte[]{}));
try (VectorSchemaRoot root = stream.getRoot()) {
try (FlightStream stream = client.getStream(new Ticket(new byte[]{}));
VectorSchemaRoot root = stream.getRoot()) {
while (stream.next()) {
for (final Field field : root.getSchema().getFields()) {
int value = 0;
Expand All @@ -57,7 +57,6 @@ public void getLargeMessage() throws Exception {
}
}
}
stream.close();
}
}
}
Expand Down

0 comments on commit 8fd99cd

Please sign in to comment.