diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index a8374f1deb3e8..8745dd7cba034 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -48,6 +48,8 @@ public class NativeScanner implements Scanner { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); + + private Schema schema = null; private boolean closed = false; public NativeScanner(NativeContext context, ScanOptions options, long scannerId) { @@ -92,7 +94,7 @@ public boolean hasNext() { } peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes); if (options.getColumns() != null) { - Preconditions.checkState(peek.getNodes().size() == options.getColumns().length); + Preconditions.checkState(schema().getFields().size() == options.getColumns().length); } return true; } @@ -122,12 +124,19 @@ public Iterable scan() { @Override public Schema schema() { + if (schema != null) { + return schema; + } readLock.lock(); try { + if (schema != null) { + return schema; + } if (closed) { throw new NativeInstanceReleasedException(); } - return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + schema = SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + return schema; } catch (IOException e) { throw new RuntimeException(e); } finally { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index a2ebe67b8b1d2..3480a3cd8c6cd 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -200,6 +200,68 @@ public void testCsvReadTab() throws Exception { AutoCloseables.close(vsr, allocator); } + + @Test + public void testStructTypeRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[] {"_1"}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + + VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + AutoCloseables.close(vsr, allocator); + } + + @Test + public void testStructTypeReadWithEmptyProjector() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + Schema scannerSchema = nativeScanner.schema(); + VectorSchemaRoot vsr = VectorSchemaRoot.create(scannerSchema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + assertEquals(0, scannerSchema.getFields().size()); + AutoCloseables.close(vsr, allocator); + } + @Test public void testReadPartialFile() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); diff --git a/java/dataset/src/test/resources/data/struct_example.parquet b/java/dataset/src/test/resources/data/struct_example.parquet new file mode 100644 index 0000000000000..cf94819defc3a Binary files /dev/null and b/java/dataset/src/test/resources/data/struct_example.parquet differ