From 6f1cd8db7997387174091676630c1ad591da9456 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 24 Jun 2019 11:13:09 -0400 Subject: [PATCH] Rework interface for accessing server-sent metadata during DoPut --- .../apache/arrow/flight/AsyncPutListener.java | 63 +++++++ .../org/apache/arrow/flight/FlightClient.java | 51 +++-- .../apache/arrow/flight/SyncPutListener.java | 114 ++++++++++++ .../integration/IntegrationTestClient.java | 22 +-- .../arrow/flight/TestApplicationMetadata.java | 174 ++++++++++++------ .../arrow/flight/TestBasicOperation.java | 3 +- .../apache/arrow/flight/TestLargeMessage.java | 2 +- .../java/org/apache/arrow/flight/TestTls.java | 4 +- .../flight/example/TestExampleServer.java | 4 +- 9 files changed, 338 insertions(+), 99 deletions(-) create mode 100644 java/flight/src/main/java/org/apache/arrow/flight/AsyncPutListener.java create mode 100644 java/flight/src/main/java/org/apache/arrow/flight/SyncPutListener.java diff --git a/java/flight/src/main/java/org/apache/arrow/flight/AsyncPutListener.java b/java/flight/src/main/java/org/apache/arrow/flight/AsyncPutListener.java new file mode 100644 index 0000000000000..c8214e3195310 --- /dev/null +++ b/java/flight/src/main/java/org/apache/arrow/flight/AsyncPutListener.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * A handler for server-sent application metadata messages during a Flight DoPut operation. + * + *

To handle messages, create an instance of this class overriding {@link #onNext(PutResult)}. The other methods + * should not be overridden. + */ +public class AsyncPutListener implements FlightClient.PutListener { + + private CompletableFuture completed; + + public AsyncPutListener() { + completed = new CompletableFuture<>(); + } + + /** + * Wait for the stream to finish on the server side. You must call this to be notified of any errors that may have + * happened during the upload. + */ + @Override + public final void getResult() { + try { + completed.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onNext(PutResult val) { + } + + @Override + public final void onError(Throwable t) { + completed.completeExceptionally(t); + } + + @Override + public final void onCompleted() { + completed.complete(null); + } +} diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java index 673e4d78ef010..9ac3686fea377 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightClient.java @@ -20,7 +20,6 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.util.Iterator; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -44,7 +43,6 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -168,7 +166,7 @@ public void authenticate(ClientAuthHandler handler, CallOption... options) { * @return ClientStreamListener an interface to control uploading data */ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRoot root, - StreamListener metadataListener, CallOption... options) { + PutListener metadataListener, CallOption... options) { return startPut(descriptor, root, new MapDictionaryProvider(), metadataListener, options); } @@ -181,7 +179,7 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo * @return ClientStreamListener an interface to control uploading data */ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRoot root, DictionaryProvider provider, - StreamListener metadataListener, CallOption... options) { + PutListener metadataListener, CallOption... options) { Preconditions.checkNotNull(descriptor); Preconditions.checkNotNull(root); @@ -194,7 +192,7 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext); return new PutObserver(new VectorUnloader( root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */), - observer, resultObserver); + observer, metadataListener); } /** @@ -257,7 +255,7 @@ public void onCompleted() { return stream; } - private static class SetStreamObserver extends CompletableFuture implements StreamObserver { + private static class SetStreamObserver implements StreamObserver { private final BufferAllocator allocator; private final StreamListener listener; @@ -277,13 +275,11 @@ public void onNext(Flight.PutResult value) { @Override public void onError(Throwable t) { listener.onError(t); - completeExceptionally(t); } @Override public void onCompleted() { listener.onCompleted(); - complete(null); } } @@ -291,13 +287,13 @@ private static class PutObserver implements ClientStreamListener { private final ClientCallStreamObserver observer; private final VectorUnloader unloader; - private final CompletableFuture futureResult; + private final PutListener listener; public PutObserver(VectorUnloader unloader, ClientCallStreamObserver observer, - CompletableFuture futureResult) { + PutListener listener) { this.observer = observer; this.unloader = unloader; - this.futureResult = futureResult; + this.listener = listener; } @Override @@ -308,8 +304,7 @@ public void putNext() { @Override public void putNext(ArrowBuf appMetadata) { ArrowRecordBatch batch = unloader.getRecordBatch(); - // Check the futureResult in case server sent an exception - while (!observer.isReady() && !futureResult.isDone()) { + while (!observer.isReady()) { /* busy wait */ } // Takes ownership of appMetadata @@ -328,11 +323,7 @@ public void completed() { @Override public void getResult() { - try { - futureResult.get(); - } catch (Exception ex) { - throw Throwables.propagate(ex); - } + listener.getResult(); } } @@ -367,6 +358,30 @@ public interface ClientStreamListener { void getResult(); } + /** + * A handler for server-sent application metadata messages during a Flight DoPut operation. + * + *

Generally, instead of implementing this yourself, you should use {@link AsyncPutListener} or {@link + * SyncPutListener}. + */ + public interface PutListener extends StreamListener { + + /** + * Wait for the stream to finish on the server side. You must call this to be notified of any errors that may have + * happened during the upload. + */ + void getResult(); + + /** + * Called when a message from the server is received. + * + * @param val The application metadata. This buffer will be reclaimed once onNext returns; you must retain a + * reference to use it outside this method. + */ + @Override + void onNext(PutResult val); + } + /** * Shut down this client. */ diff --git a/java/flight/src/main/java/org/apache/arrow/flight/SyncPutListener.java b/java/flight/src/main/java/org/apache/arrow/flight/SyncPutListener.java new file mode 100644 index 0000000000000..f1246a1d07911 --- /dev/null +++ b/java/flight/src/main/java/org/apache/arrow/flight/SyncPutListener.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ArrowBuf; + +/** + * A listener for server-sent application metadata messages during a Flight DoPut. This class wraps the messages in a + * synchronous interface. + */ +public final class SyncPutListener implements FlightClient.PutListener, AutoCloseable { + + private final LinkedBlockingQueue queue; + private final CompletableFuture completed; + private static final Object DONE = new Object(); + private static final Object DONE_WITH_EXCEPTION = new Object(); + + public SyncPutListener() { + queue = new LinkedBlockingQueue<>(); + completed = new CompletableFuture<>(); + } + + private PutResult unwrap(Object queueItem) throws InterruptedException, ExecutionException { + if (queueItem == DONE) { + queue.put(queueItem); + return null; + } else if (queueItem == DONE_WITH_EXCEPTION) { + queue.put(queueItem); + completed.get(); + } + return (PutResult) queueItem; + } + + /** + * Get the next message from the server, blocking until it is available. + * + * @return The next message, or null if the server is done sending messages. The caller assumes ownership of the + * metadata and must remember to close it. + * @throws InterruptedException if interrupted while waiting. + * @throws ExecutionException if the server sent an error, or if there was an internal error. + */ + public PutResult read() throws InterruptedException, ExecutionException { + return unwrap(queue.take()); + } + + /** + * Get the next message from the server, blocking for the specified amount of time until it is available. + * + * @return The next message, or null if the server is done sending messages or no message arrived before the timeout. + * The caller assumes ownership of the metadata and must remember to close it. + * @throws InterruptedException if interrupted while waiting. + * @throws ExecutionException if the server sent an error, or if there was an internal error. + */ + public PutResult poll(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { + return unwrap(queue.poll(timeout, unit)); + } + + @Override + public void getResult() { + try { + completed.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onNext(PutResult val) { + final ArrowBuf metadata = val.getApplicationMetadata(); + metadata.getReferenceManager().retain(); + queue.add(PutResult.metadata(metadata)); + } + + @Override + public void onError(Throwable t) { + completed.completeExceptionally(t); + queue.add(DONE_WITH_EXCEPTION); + } + + @Override + public void onCompleted() { + completed.complete(null); + queue.add(DONE); + } + + @Override + public void close() { + queue.forEach(o -> { + if (o instanceof PutResult) { + ((PutResult) o).close(); + } + }); + } +} diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java index 56663683e13b6..477dfdba4bdd4 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java @@ -22,16 +22,12 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import org.apache.arrow.flight.AsyncPutListener; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; -import org.apache.arrow.flight.FlightProducer.StreamListener; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.PutResult; @@ -102,9 +98,8 @@ private void run(String[] args) throws ParseException, IOException { jsonRoot = VectorSchemaRoot.create(root.getSchema(), allocator); VectorUnloader unloader = new VectorUnloader(root); VectorLoader jsonLoader = new VectorLoader(jsonRoot); - final CompletableFuture completed = new CompletableFuture<>(); FlightClient.ClientStreamListener stream = client.startPut(descriptor, root, reader, - new StreamListener() { + new AsyncPutListener() { int counter = 0; @Override @@ -118,16 +113,6 @@ public void onNext(PutResult val) { } counter++; } - - @Override - public void onError(Throwable t) { - completed.completeExceptionally(t); - } - - @Override - public void onCompleted() { - completed.complete(null); - } }); int counter = 0; while (reader.read(root)) { @@ -143,9 +128,6 @@ public void onCompleted() { stream.completed(); // Need to call this, or exceptions from the server get swallowed stream.getResult(); - completed.get(30, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new RuntimeException(e); } // 2. Get the ticket for the data. diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java b/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java index 3bd3e16612257..ad2c58f3b7803 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java @@ -19,7 +19,11 @@ import java.util.Collections; -import org.apache.arrow.flight.FlightProducer.StreamListener; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import org.apache.arrow.flight.FlightClient.PutListener; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; @@ -47,23 +51,20 @@ public class TestApplicationMetadata { // This test is consistently flaky on CI, unfortunately. @Ignore public void retrieveMetadata() { - try (final BufferAllocator a = new RootAllocator(Long.MAX_VALUE); - final FlightServer s = - FlightTestUtil.getStartedServer( - (location) -> FlightServer.builder(a, location, new MetadataFlightProducer(a)).build()); - final FlightClient client = FlightClient.builder(a, s.getLocation()).build(); - final FlightStream stream = client.getStream(new Ticket(new byte[0]))) { - byte i = 0; - while (stream.next()) { - final IntVector vector = (IntVector) stream.getRoot().getVector("a"); - Assert.assertEquals(1, vector.getValueCount()); - Assert.assertEquals(10, vector.get(0)); - Assert.assertEquals(i, stream.getLatestMetadata().getByte(0)); - i++; + test((allocator, client) -> { + try (final FlightStream stream = client.getStream(new Ticket(new byte[0]))) { + byte i = 0; + while (stream.next()) { + final IntVector vector = (IntVector) stream.getRoot().getVector("a"); + Assert.assertEquals(1, vector.getValueCount()); + Assert.assertEquals(10, vector.get(0)); + Assert.assertEquals(i, stream.getLatestMetadata().getByte(0)); + i++; + } + } catch (Exception e) { + throw new RuntimeException(e); } - } catch (Exception e) { - throw new RuntimeException(e); - } + }); } /** @@ -71,51 +72,116 @@ public void retrieveMetadata() { */ @Test @Ignore - public void uploadMetadata() { - final FlightDescriptor descriptor = FlightDescriptor.path("test"); + public void uploadMetadataAsync() { final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true)))); - try (final BufferAllocator a = new RootAllocator(Long.MAX_VALUE); - VectorSchemaRoot root = VectorSchemaRoot.create(schema, a); - final FlightServer s = - FlightTestUtil.getStartedServer( - (location) -> FlightServer.builder(a, location, new MetadataFlightProducer(a)).build()); - final FlightClient client = FlightClient.builder(a, s.getLocation()).build()) { - - final StreamListener listener = new StreamListener() { - int counter = 0; + test((allocator, client) -> { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + final FlightDescriptor descriptor = FlightDescriptor.path("test"); + + final PutListener listener = new AsyncPutListener() { + int counter = 0; + + @Override + public void onNext(PutResult val) { + Assert.assertNotNull(val); + Assert.assertEquals(counter, val.getApplicationMetadata().getByte(0)); + counter++; + } + }; + final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener); - @Override - public void onNext(PutResult val) { - Assert.assertNotNull(val); - Assert.assertEquals(counter, val.getApplicationMetadata().getByte(0)); - counter++; + root.allocateNew(); + for (byte i = 0; i < 10; i++) { + final IntVector vector = (IntVector) root.getVector("a"); + final ArrowBuf metadata = allocator.buffer(1); + metadata.writeByte(i); + vector.set(0, 10); + vector.setValueCount(1); + root.setRowCount(1); + writer.putNext(metadata); } + writer.completed(); + // Must attempt to retrieve the result to get any server-side errors. + writer.getResult(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Ensure that a client can send metadata to the server. Uses the synchronous API. + */ + @Test + @Ignore + public void uploadMetadataSync() { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true)))); + test((allocator, client) -> { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + final SyncPutListener listener = new SyncPutListener()) { + final FlightDescriptor descriptor = FlightDescriptor.path("test"); + final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener); - @Override - public void onError(Throwable t) { - Assert.fail(t.toString()); + root.allocateNew(); + for (byte i = 0; i < 10; i++) { + final IntVector vector = (IntVector) root.getVector("a"); + final ArrowBuf metadata = allocator.buffer(1); + metadata.writeByte(i); + vector.set(0, 10); + vector.setValueCount(1); + root.setRowCount(1); + writer.putNext(metadata); + try (final PutResult message = listener.poll(5000, TimeUnit.SECONDS)) { + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getApplicationMetadata().getByte(0)); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } } + writer.completed(); + // Must attempt to retrieve the result to get any server-side errors. + writer.getResult(); + } + }); + } + + /** + * Make sure that a {@link SyncPutListener} properly reclaims memory if ignored. + */ + @Test + @Ignore + public void syncMemoryReclaimed() { + final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true)))); + test((allocator, client) -> { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + final SyncPutListener listener = new SyncPutListener()) { + final FlightDescriptor descriptor = FlightDescriptor.path("test"); + final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener); - @Override - public void onCompleted() { - Assert.assertEquals(10, counter); + root.allocateNew(); + for (byte i = 0; i < 10; i++) { + final IntVector vector = (IntVector) root.getVector("a"); + final ArrowBuf metadata = allocator.buffer(1); + metadata.writeByte(i); + vector.set(0, 10); + vector.setValueCount(1); + root.setRowCount(1); + writer.putNext(metadata); } - }; - final FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, listener); - - root.allocateNew(); - for (byte i = 0; i < 10; i++) { - final IntVector vector = (IntVector) root.getVector("a"); - final ArrowBuf metadata = a.buffer(1); - metadata.writeByte(i); - vector.set(0, 10); - vector.setValueCount(1); - root.setRowCount(1); - writer.putNext(metadata); + writer.completed(); + // Must attempt to retrieve the result to get any server-side errors. + writer.getResult(); } - writer.completed(); - // Must attempt to retrieve the result to get any server-side errors. - writer.getResult(); + }); + } + + private void test(BiConsumer fun) { + try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + final FlightServer s = + FlightTestUtil.getStartedServer( + (location) -> FlightServer.builder(allocator, location, new MetadataFlightProducer(allocator)).build()); + final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) { + fun.accept(allocator, client); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java index a7880fee1a755..abc5a2c321d11 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestBasicOperation.java @@ -23,7 +23,6 @@ import java.util.function.Consumer; import org.apache.arrow.flight.FlightClient.ClientStreamListener; -import org.apache.arrow.flight.FlightProducer.StreamListener; import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.flight.impl.Flight.FlightDescriptor.DescriptorType; import org.apache.arrow.memory.BufferAllocator; @@ -98,7 +97,7 @@ public void putStream() throws Exception { VectorSchemaRoot root = VectorSchemaRoot.of(iv); ClientStreamListener listener = c - .startPut(FlightDescriptor.path("hello"), root, NoOpStreamListener.getInstance()); + .startPut(FlightDescriptor.path("hello"), root, new AsyncPutListener()); //batch 1 root.allocateNew(); diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java b/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java index c00b9ecbf477d..629b6f5ebd8bc 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestLargeMessage.java @@ -76,7 +76,7 @@ public void putLargeMessage() throws Exception { BufferAllocator testAllocator = a.newChildAllocator("testcase", 0, Long.MAX_VALUE); VectorSchemaRoot root = generateData(testAllocator)) { final FlightClient.ClientStreamListener listener = client.startPut(FlightDescriptor.path("hello"), root, - NoOpStreamListener.getInstance()); + new AsyncPutListener()); listener.putNext(); listener.completed(); listener.getResult(); diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestTls.java b/java/flight/src/test/java/org/apache/arrow/flight/TestTls.java index c22304d56470f..b9d4dea55724b 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestTls.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestTls.java @@ -96,9 +96,9 @@ void test(Consumer testFn) { Producer producer = new Producer(); FlightServer s = FlightTestUtil.getStartedServer( - (port) -> { + (location) -> { try { - return FlightServer.builder(a, Location.forGrpcTls(FlightTestUtil.LOCALHOST, port), producer) + return FlightServer.builder(a, location, producer) .useTls(certKey.cert, certKey.key) .build(); } catch (IOException e) { diff --git a/java/flight/src/test/java/org/apache/arrow/flight/example/TestExampleServer.java b/java/flight/src/test/java/org/apache/arrow/flight/example/TestExampleServer.java index 7063992cf3949..fb157f45ed14f 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/example/TestExampleServer.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/example/TestExampleServer.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.arrow.flight.AsyncPutListener; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightClient.ClientStreamListener; import org.apache.arrow.flight.FlightDescriptor; @@ -26,7 +27,6 @@ import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.FlightTestUtil; import org.apache.arrow.flight.Location; -import org.apache.arrow.flight.NoOpStreamListener; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; @@ -78,7 +78,7 @@ public void putStream() { VectorSchemaRoot root = VectorSchemaRoot.of(iv); ClientStreamListener listener = client.startPut(FlightDescriptor.path("hello"), root, - NoOpStreamListener.getInstance()); + new AsyncPutListener()); //batch 1 root.allocateNew();