From fb6df39679e3e2af23eab6ed76650ea2fd89cfac Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 11 Nov 2021 16:42:37 +0800 Subject: [PATCH] IllegalStateException reading data with struct type (#43) --- .../arrow/dataset/jni/NativeDataset.java | 2 +- .../arrow/dataset/jni/NativeScanner.java | 28 +++++--- .../dataset/file/TestFileSystemDataset.java | 62 ++++++++++++++++++ .../resources/data/struct_example.parquet | Bin 0 -> 1250 bytes 4 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 java/dataset/src/test/resources/data/struct_example.parquet diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 30ff1a9302f7a..127c49c124abd 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -42,7 +42,7 @@ public synchronized NativeScanner newScan(ScanOptions options) { } long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); - return new NativeScanner(context, scannerId); + return new NativeScanner(context, options, scannerId); } @Override diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java index 5ca9206a2f148..311e7d7696941 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -18,23 +18,17 @@ package org.apache.arrow.dataset.jni; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; +import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.scanner.ScanTask; import org.apache.arrow.dataset.scanner.Scanner; -import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.BufferLedger; -import org.apache.arrow.memory.NativeUnderlyingMemory; -import org.apache.arrow.memory.util.LargeMemoryUtil; -import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.SchemaUtility; @@ -48,15 +42,19 @@ public class NativeScanner implements Scanner { private final AtomicBoolean executed = new AtomicBoolean(false); private final NativeContext context; + private final ScanOptions options; private final long scannerId; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock writeLock = lock.writeLock(); private final Lock readLock = lock.readLock(); + + private Schema schema = null; private boolean closed = false; - public NativeScanner(NativeContext context, long scannerId) { + public NativeScanner(NativeContext context, ScanOptions options, long scannerId) { this.context = context; + this.options = options; this.scannerId = scannerId; } @@ -95,6 +93,9 @@ public boolean hasNext() { return false; } peek = UnsafeRecordBatchSerializer.deserializeUnsafe(context.getAllocator(), bytes); + if (options.getColumns().isPresent()) { + Preconditions.checkState(schema().getFields().size() == options.getColumns().get().length); + } return true; } @@ -123,12 +124,19 @@ public Iterable scan() { @Override public Schema schema() { + if (schema != null) { + return schema; + } readLock.lock(); try { + if (schema != null) { + return schema; + } if (closed) { throw new NativeInstanceReleasedException(); } - return SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + schema = SchemaUtility.deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + return schema; } catch (IOException e) { throw new RuntimeException(e); } finally { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java index fe93ab16a540b..4b070623d6842 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java @@ -163,6 +163,68 @@ public void testCsvReadTab() throws Exception { AutoCloseables.close(vsr, allocator); } + + @Test + public void testStructTypeRead() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[] {"_1"}, Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + + VectorSchemaRoot vsr = VectorSchemaRoot.create(schema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + AutoCloseables.close(vsr, allocator); + } + + @Test + public void testStructTypeReadWithEmptyProjector() throws Exception { + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator, + NativeMemoryPool.getDefault(), ParquetFileFormat.createDefault(), "file://" + resourcePath("data/struct_example.parquet")); + ScanOptions options = new ScanOptions(new String[0], Filter.EMPTY, 100); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + NativeScanner nativeScanner = dataset.newScan(options); + List scanTasks = collect(nativeScanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + ScanTask scanTask = scanTasks.get(0); + ScanTask.BatchIterator itr = scanTask.execute(); + Schema scannerSchema = nativeScanner.schema(); + VectorSchemaRoot vsr = VectorSchemaRoot.create(scannerSchema, allocator); + VectorLoader loader = new VectorLoader(vsr); + int rowCount = 0; + while (itr.hasNext()) { + try (ArrowRecordBatch next = itr.next()) { + loader.load(next); + } + rowCount += vsr.getRowCount(); + + } + Assert.assertEquals(50, rowCount); + assertEquals(1, schema.getFields().size()); + assertEquals("_1", schema.getFields().get(0).getName()); + assertEquals(0, scannerSchema.getFields().size()); + AutoCloseables.close(vsr, allocator); + } + @Test public void testReadPartialFile() throws Exception { ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a"); diff --git a/java/dataset/src/test/resources/data/struct_example.parquet b/java/dataset/src/test/resources/data/struct_example.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cf94819defc3a294b4e132238a1fcc4f1e03f62c GIT binary patch literal 1250 zcmb8vQE!t_6bJA#3Y5*(t-6b#s1?`RDMKhNEz4efGd`Hb_@prnEp#^0b^|)IEPL@I znCOQvKAFV_KY<^^58$)U{>!>)GIg%$FWlUFIOm=VVe`Qgn-TjcvcsIg0t`haX^Ldi z(NB?P1Cpushgqz?&ndJ3fDk`>u>{ZLK$v>4!2a}UAIc}WJ4VsXh0KOXhFL)&wFC= z8E;!`fo!OQ0}W_`3oU4aSDLN||HVNZ@J_w-W;*z*2G;(;Xtc#yGYyY%YV@#>Z2R%6 zPSSfB4t>T^lT)~k&3IybB^w^i)EUP_YVryn4U!abtbpZw7=^=dlvar3K-E;EDrt;Y znnh{PJ}{4)i1op3TlN!^Zo0Qd45y?(u;;42eVQR+X z*XA%x{lHIszOE-}Jn5ynq4x)26eRlk^L>5f52s5V8`DQ&=M2}Q@n{P- zk0w#%cOxW2mC(?SH=wWY9~>CJ3e`^+^5fXw!}RQNJw4g&Aj{kvk5YdyN*<%aQTBr~ z%Dp%{fRsnG<60ZW<=Od`ak!Rgmuub0APUNU+Y9>bzUMjht{e7U*Kq>NwOlW3T0O7Z Y@>*6m><0F#YG_3G(;;4nO?=&c0$&vG1poj5 literal 0 HcmV?d00001