diff --git a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java index 71cafd2ec3075..1198d89c40aef 100644 --- a/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java +++ b/java/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/utils/FlightEndpointDataQueue.java @@ -104,9 +104,8 @@ private CloseableEndpointStreamPair next(final EndpointStreamSupplier endpointSt futures.remove(future); try { final CloseableEndpointStreamPair endpoint = future.get(); - // Get the next FlightStream with content. - // The stream is non-empty. - if (endpoint.getStream().getRoot().getRowCount() > 0) { + // Get the next FlightStream that has a root with content. + if (endpoint != null) { return endpoint; } } catch (final ExecutionException | InterruptedException | CancellationException e) { @@ -178,8 +177,12 @@ public synchronized void enqueue(final CloseableEndpointStreamPair endpointReque endpointsToClose.add(endpointRequest); futures.add(completionService.submit(() -> { // `FlightStream#next` will block until new data can be read or stream is over. - endpointRequest.getStream().next(); - return endpointRequest; + while (endpointRequest.getStream().next()) { + if (endpointRequest.getStream().getRoot().getRowCount() > 0) { + return endpointRequest; + } + } + return null; })); } diff --git a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java index e2ac100b8dc36..5b9e269fb3cc8 100644 --- a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java +++ b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ResultSetTest.java @@ -455,4 +455,17 @@ allocator, forGrpcInsecure("localhost", 0), rootProducer) } } } + + @Test + public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception { + try (Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery( + CoreMockedSqlProducers.LEGACY_REGULAR_WITH_EMPTY_SQL_CMD)) { + long rowCount = 0; + while (resultSet.next()) { + ++rowCount; + } + assertEquals(2, rowCount); + } + } } diff --git a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/CoreMockedSqlProducers.java b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/CoreMockedSqlProducers.java index cf359849a7105..a8e2e7f2e4ce5 100644 --- a/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/CoreMockedSqlProducers.java +++ b/java/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/CoreMockedSqlProducers.java @@ -64,6 +64,7 @@ public final class CoreMockedSqlProducers { public static final String LEGACY_REGULAR_SQL_CMD = "SELECT * FROM TEST"; public static final String LEGACY_METADATA_SQL_CMD = "SELECT * FROM METADATA"; public static final String LEGACY_CANCELLATION_SQL_CMD = "SELECT * FROM TAKES_FOREVER"; + public static final String LEGACY_REGULAR_WITH_EMPTY_SQL_CMD = "SELECT * FROM TEST_EMPTIES"; private CoreMockedSqlProducers() { // Prevent instantiation. @@ -80,9 +81,44 @@ public static MockFlightSqlProducer getLegacyProducer() { addLegacyRegularSqlCmdSupport(producer); addLegacyMetadataSqlCmdSupport(producer); addLegacyCancellationSqlCmdSupport(producer); + addQueryWithEmbeddedEmptyRoot(producer); return producer; } + private static void addQueryWithEmbeddedEmptyRoot(final MockFlightSqlProducer producer) { + final Schema querySchema = new Schema(ImmutableList.of( + new Field( + "ID", + new FieldType(true, new ArrowType.Int(64, true), + null), + null) + )); + + final List> resultProducers = new ArrayList<>(); + Consumer dataRoot = listener -> { + try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + final VectorSchemaRoot root = VectorSchemaRoot.create(querySchema, allocator)) { + root.allocateNew(); + root.setRowCount(0); + listener.start(root); + listener.putNext(); // empty root + ((BigIntVector) root.getVector("ID")).setSafe(0, 100L); + root.setRowCount(1); + listener.putNext(); // data root + root.clear(); + root.setRowCount(0); + listener.putNext(); // empty root + ((BigIntVector) root.getVector("ID")).setSafe(0, 100L); + root.setRowCount(1); + listener.putNext(); // data root + } finally { + listener.completed(); + } + }; + resultProducers.add(dataRoot); + producer.addSelectQuery(LEGACY_REGULAR_WITH_EMPTY_SQL_CMD, querySchema, resultProducers); + } + private static void addLegacyRegularSqlCmdSupport(final MockFlightSqlProducer producer) { final Schema querySchema = new Schema(ImmutableList.of( new Field(