diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 30ff1a9302f7a..127c49c124abd 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -42,7 +42,7 @@ public synchronized NativeScanner newScan(ScanOptions options) { } long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); - return new NativeScanner(context, scannerId); + return new NativeScanner(context, options, scannerId); } @Override diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 5ca9206a2f148..311e7d7696941 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -18,23 +18,17 @@ package org.apache.arrow.dataset.jni; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; +import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.BufferLedger; -import org.apache.arrow.memory.NativeUnderlyingMemory; -import org.apache.arrow.memory.util.LargeMemoryUtil; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.SchemaUtility; @@ -48,15 +42,19 @@ public class NativeScanner implements Scanner { private final AtomicBoolean executed = new AtomicBoolean(false); private final NativeContext context; + private final ScanOptions options; private final long scannerId; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); + + private Schema schema = null; private boolean closed = false; - public NativeScanner(NativeContext context, long scannerId) { + public NativeScanner(NativeContext context, ScanOptions options, long scannerId) { this.context = context; + this.options = options; this.scannerId = scannerId; } @@ -95,6 +93,9 @@ public boolean hasNext() { return false; } peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes); + if (options.getColumns().isPresent()) { + Preconditions.checkState(schema().getFields().size() == options.getColumns().get().length); + } return true; } @@ -123,12 +124,19 @@ public Iterable scan() { @Override public Schema schema() { + if (schema != null) { + return schema; + } readLock.lock(); try { + if (schema != null) { + return schema; + } if (closed) { throw new NativeInstanceReleasedException(); } - return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + schema = SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + return schema; } catch (IOException e) { throw new RuntimeException(e); } finally { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index fe93ab16a540b..4b070623d6842 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -163,6 +163,68 @@ public void testCsvReadTab() throws Exception { AutoCloseables.close(vsr, allocator); } + + @Test + public void testStructTypeRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[] {"_1"}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + + VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + AutoCloseables.close(vsr, allocator); + } + + @Test + public void testStructTypeReadWithEmptyProjector() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + Schema scannerSchema = nativeScanner.schema(); + VectorSchemaRoot vsr = VectorSchemaRoot.create(scannerSchema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + assertEquals(0, scannerSchema.getFields().size()); + AutoCloseables.close(vsr, allocator); + } + @Test public void testReadPartialFile() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); diff --git a/java/dataset/src/test/resources/data/struct_example.parquet b/java/dataset/src/test/resources/data/struct_example.parquet new file mode 100644 index 0000000000000..cf94819defc3a Binary files /dev/null and b/java/dataset/src/test/resources/data/struct_example.parquet differ